summaryrefslogtreecommitdiff
path: root/opendc-format/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-format/src/main')
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt35
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt35
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt66
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt89
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt44
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt32
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt100
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt127
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt176
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt176
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt126
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt107
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt95
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt112
14 files changed, 0 insertions, 1320 deletions
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt
deleted file mode 100644
index 97d6f239..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.environment
-
-import java.io.Closeable
-
-/**
- * An interface for reading descriptions of topology environments into memory.
- */
-public interface EnvironmentReader : Closeable {
- /**
- * Read the environment into a list.
- */
- public fun read(): List<MachineDef>
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt
deleted file mode 100644
index f65c4880..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.environment
-
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.power.PowerModel
-import java.util.*
-
-public data class MachineDef(
- val uid: UUID,
- val name: String,
- val meta: Map<String, Any>,
- val model: MachineModel,
- val powerModel: PowerModel
-)
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt
deleted file mode 100644
index c313467f..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.environment.sc18
-
-import com.fasterxml.jackson.annotation.JsonSubTypes
-import com.fasterxml.jackson.annotation.JsonTypeInfo
-
-/**
- * A topology setup.
- *
- * @property name The name of the setup.
- * @property rooms The rooms in the topology.
- */
-internal data class Setup(val name: String, val rooms: List<Room>)
-
-/**
- * A room in a topology.
- *
- * @property type The type of room in the topology.
- * @property objects The objects in the room.
- */
-internal data class Room(val type: String, val objects: List<RoomObject>)
-
-/**
- * An object in a [Room].
- *
- * @property type The type of the room object.
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
-@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)])
-internal sealed class RoomObject(val type: String) {
- /**
- * A rack in a server room.
- *
- * @property machines The machines in the rack.
- */
- internal data class Rack(val machines: List<Machine>) : RoomObject("RACK")
-}
-
-/**
- * A machine in the setup that consists of the specified CPU's represented as
- * integer identifiers and ethernet speed.
- *
- * @property cpus The CPUs in the machine represented as integer identifiers.
- */
-internal data class Machine(val cpus: List<Int>)
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
deleted file mode 100644
index 7780a98e..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.environment.sc18
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import com.fasterxml.jackson.module.kotlin.readValue
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.MachineDef
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.power.ConstantPowerModel
-import java.io.InputStream
-import java.util.*
-
-/**
- * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Topology
- * Schedulers".
- *
- * @param input The input stream to read from.
- * @param mapper The Jackson object mapper to use.
- */
-public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader {
- /**
- * The environment that was read from the file.
- */
- private val setup: Setup = mapper.readValue(input)
-
- /**
- * Read the environment.
- */
- public override fun read(): List<MachineDef> {
- var counter = 0
- return setup.rooms.flatMap { room ->
- room.objects.flatMap { roomObject ->
- when (roomObject) {
- is RoomObject.Rack -> {
- roomObject.machines.map { machine ->
- val cores = machine.cpus.flatMap { id ->
- when (id) {
- 1 -> {
- val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
- List(node.coreCount) { ProcessingUnit(node, it, 4100.0) }
- }
- 2 -> {
- val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
- List(node.coreCount) { ProcessingUnit(node, it, 3500.0) }
- }
- else -> throw IllegalArgumentException("The cpu id $id is not recognized")
- }
- }
- MachineDef(
- UUID(0L, counter++.toLong()),
- "node-$counter",
- emptyMap(),
- MachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))),
- ConstantPowerModel(0.0)
- )
- }
- }
- }
- }
- }
- }
-
- override fun close() {}
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt
deleted file mode 100644
index 3ce79d69..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace
-
-import java.util.UUID
-
-/**
- * An entry in a workload trace.
- *
- * @param uid The unique identifier of the entry.
- * @param name The name of the entry.
- * @param start The start time of the workload.
- * @param workload The workload of the entry.
- * @param meta The meta-data associated with the workload.
- */
-public data class TraceEntry<out T>(
- val uid: UUID,
- val name: String,
- val start: Long,
- val workload: T,
- val meta: Map<String, Any>
-)
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
deleted file mode 100644
index 797a88d5..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace
-
-/**
- * An interface for reading workloads into memory.
- *
- * This interface must guarantee that the entries are delivered in order of submission time.
- *
- * @param T The shape of the workloads supported by this reader.
- */
-public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt
deleted file mode 100644
index ff6cdd02..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.bitbrains
-
-import com.fasterxml.jackson.annotation.JsonProperty
-import com.fasterxml.jackson.databind.MappingIterator
-import com.fasterxml.jackson.dataformat.csv.CsvMapper
-import com.fasterxml.jackson.dataformat.csv.CsvSchema
-import java.io.InputStream
-
-/**
- * A trace reader that enables the user to read Bitbrains specific trace data.
- */
-public class BitbrainsRawTraceReader(input: InputStream) : Iterator<BitbrainsRawTraceReader.Entry>, AutoCloseable {
- /**
- * The [CsvSchema] that is used to parse the trace.
- */
- private val schema = CsvSchema.builder()
- .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .setAllowComments(true)
- .setUseHeader(true)
- .setColumnSeparator(';')
- .build()
-
- /**
- * The mapping iterator to use.
- */
- private val iterator: MappingIterator<Entry> = CsvMapper().readerFor(Entry::class.java).with(schema)
- .readValues(input)
-
- override fun hasNext(): Boolean {
- return iterator.hasNext()
- }
-
- override fun next(): Entry {
- return iterator.next()
- }
-
- override fun close() {
- iterator.close()
- }
-
- /**
- * A single entry in the trace.
- */
- public data class Entry(
- @JsonProperty("Timestamp [ms]")
- val timestamp: Long,
- @JsonProperty("CPU cores")
- val cpuCores: Int,
- @JsonProperty("CPU capacity provisioned [MHZ]")
- val cpuCapacity: Double,
- @JsonProperty("CPU usage [MHZ]")
- val cpuUsage: Double,
- @JsonProperty("CPU usage [%]")
- val cpuUsagePct: Double,
- @JsonProperty("Memory capacity provisioned [KB]")
- val memCapacity: Double,
- @JsonProperty("Memory usage [KB]")
- val memUsage: Double,
- @JsonProperty("Disk read throughput [KB/s]")
- val diskRead: Double,
- @JsonProperty("Disk write throughput [KB/s]")
- val diskWrite: Double,
- @JsonProperty("Network received throughput [KB/s]")
- val netReceived: Double,
- @JsonProperty("Network transmitted throughput [KB/s]")
- val netTransmitted: Double
- )
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
deleted file mode 100644
index 9e4876df..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.bitbrains
-
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.io.File
-import java.io.FileInputStream
-import java.util.*
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * A [TraceReader] for the public VM workload trace format.
- *
- * @param traceDirectory The directory of the traces.
- */
-public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkload> {
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>>
-
- /**
- * Initialize the reader.
- */
- init {
- val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>()
- val traceInterval = 5 * 60 * 1000L
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" }
- .forEach { vmFile ->
- val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>()
- var vmId = -1L
- var maxCores = Int.MIN_VALUE
- var requiredMemory = Long.MIN_VALUE
- var startTime = Long.MAX_VALUE
- var lastTimestamp = Long.MIN_VALUE
-
- BitbrainsRawTraceReader(FileInputStream(vmFile)).use { reader ->
- reader.forEach { entry ->
- val timestamp = entry.timestamp * 1000L
- val cpuUsage = entry.cpuUsage
- vmId = vmFile.nameWithoutExtension.trim().toLong()
- val cores = entry.cpuCores
- maxCores = max(maxCores, cores)
- requiredMemory = max(requiredMemory, (entry.memCapacity / 1000).toLong())
-
- if (lastTimestamp < 0) {
- lastTimestamp = timestamp - 5 * 60 * 1000L
- startTime = min(startTime, lastTimestamp)
- }
-
- if (flopsHistory.isEmpty()) {
- flopsHistory.add(SimTraceWorkload.Fragment(lastTimestamp, traceInterval, cpuUsage, cores))
- } else {
- val last = flopsHistory.last()
- val duration = timestamp - lastTimestamp
- // Perform run-length encoding
- if (duration == 0L || last.usage == cpuUsage) {
- flopsHistory[flopsHistory.size - 1] = last.copy(duration = last.duration + duration)
- } else {
- flopsHistory.add(
- SimTraceWorkload.Fragment(
- lastTimestamp,
- duration,
- cpuUsage,
- cores
- )
- )
- }
- }
-
- lastTimestamp = timestamp
- }
- }
-
- val uuid = UUID(0L, vmId)
-
- val workload = SimTraceWorkload(flopsHistory.asSequence())
- entries[vmId] = TraceEntry(
- uuid,
- vmId.toString(),
- startTime,
- workload,
- mapOf(
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
- )
- }
-
- // Create the entry iterator
- iterator = entries.values.sortedBy { it.start }.iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
deleted file mode 100644
index e68afeb7..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.gwf
-
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.workflow.api.Job
-import org.opendc.workflow.api.Task
-import org.opendc.workflow.api.WORKFLOW_TASK_CORES
-import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
-import java.io.BufferedReader
-import java.io.File
-import java.io.InputStream
-import java.util.*
-import kotlin.collections.HashSet
-import kotlin.collections.Iterator
-import kotlin.collections.List
-import kotlin.collections.MutableSet
-import kotlin.collections.component1
-import kotlin.collections.component2
-import kotlin.collections.filter
-import kotlin.collections.forEach
-import kotlin.collections.getOrPut
-import kotlin.collections.map
-import kotlin.collections.mapIndexed
-import kotlin.collections.mapOf
-import kotlin.collections.mutableMapOf
-import kotlin.collections.set
-import kotlin.collections.sortedBy
-import kotlin.collections.toMap
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * A [TraceReader] for the Grid Workload Format. See the Grid Workloads Archive (http://gwa.ewi.tudelft.nl/) for more
- * information about the format.
- *
- * Be aware that in the Grid Workload Format, workflows are not required to be ordered by submission time and therefore
- * this reader needs to read the whole trace into memory before an entry can be read. Consider converting the trace to a
- * different format for better performance.
- *
- * @param reader The buffered reader to read the trace with.
- */
-public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<Job>>
-
- /**
- * Create a [GwfTraceReader] instance from the specified [File].
- *
- * @param file The file to read from.
- */
- public constructor(file: File) : this(file.bufferedReader())
-
- /**
- * Create a [GwfTraceReader] instance from the specified [InputStream].
- *
- * @param input The input stream to read from.
- */
- public constructor(input: InputStream) : this(input.bufferedReader())
-
- /**
- * Initialize the reader.
- */
- init {
- val workflows = mutableMapOf<Long, Job>()
- val starts = mutableMapOf<Long, Long>()
- val tasks = mutableMapOf<Long, Task>()
- val taskDependencies = mutableMapOf<Task, List<Long>>()
-
- var workflowIdCol = 0
- var taskIdCol = 0
- var submitTimeCol = 0
- var runtimeCol = 0
- var coreCol = 0
- var dependencyCol = 0
-
- try {
- reader.lineSequence()
- .filter { line ->
- // Ignore comments in the trace
- !line.startsWith("#") && line.isNotBlank()
- }
- .forEachIndexed { idx, line ->
- val values = line.split(",")
-
- // Parse GWF header
- if (idx == 0) {
- val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
- workflowIdCol = header["WorkflowID"]!!
- taskIdCol = header["JobID"]!!
- submitTimeCol = header["SubmitTime"]!!
- runtimeCol = header["RunTime"]!!
- coreCol = header["NProcs"]!!
- dependencyCol = header["Dependencies"]!!
- return@forEachIndexed
- }
-
- val workflowId = values[workflowIdCol].trim().toLong()
- val taskId = values[taskIdCol].trim().toLong()
- val submitTime = values[submitTimeCol].trim().toLong() * 1000 // ms
- val runtime = max(0, values[runtimeCol].trim().toLong()) // s
- val cores = values[coreCol].trim().toInt()
- val dependencies = values[dependencyCol].split(" ")
- .filter { it.isNotEmpty() }
- .map { it.trim().toLong() }
-
- val flops: Long = 4000 * runtime * cores
-
- val workflow = workflows.getOrPut(workflowId) {
- Job(UUID(0L, workflowId), "<unnamed>", HashSet())
- }
- val workload = SimFlopsWorkload(flops)
- val task = Task(
- UUID(0L, taskId),
- "<unnamed>",
- HashSet(),
- mapOf(
- "workload" to workload,
- WORKFLOW_TASK_CORES to cores,
- WORKFLOW_TASK_DEADLINE to (runtime * 1000)
- ),
- )
- starts.merge(workflowId, submitTime, ::min)
- (workflow.tasks as MutableSet<Task>).add(task)
- tasks[taskId] = task
- taskDependencies[task] = dependencies
- }
- } finally {
- reader.close()
- }
-
- // Fix dependencies and dependents for all tasks
- taskDependencies.forEach { (task, dependencies) ->
- (task.dependencies as MutableSet<Task>).addAll(
- dependencies.map { taskId ->
- tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found")
- }
- )
- }
-
- // Create the entry iterator
- iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) }
- .sortedBy { it.start }
- .iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<Job> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
deleted file mode 100644
index bda392a9..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.swf
-
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.io.BufferedReader
-import java.io.File
-import java.io.FileReader
-import java.util.*
-
-/**
- * A [TraceReader] for reading SWF traces into VM-modeled workloads.
- *
- * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html
- *
- * @param file The trace file.
- */
-public class SwfTraceReader(
- file: File,
- maxNumCores: Int = -1
-) : TraceReader<SimWorkload> {
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>>
-
- /**
- * Initialize the reader.
- */
- init {
- val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>()
-
- val jobNumberCol = 0
- val submitTimeCol = 1 // seconds (begin of trace is 0)
- val waitTimeCol = 2 // seconds
- val runTimeCol = 3 // seconds
- val numAllocatedCoresCol = 4 // We assume that single-core processors were used at the time
- val requestedMemoryCol = 9 // KB per processor/core (-1 if not specified)
-
- val sliceDuration = 5 * 60L
-
- var jobNumber: Long
- var submitTime: Long
- var waitTime: Long
- var runTime: Long
- var cores: Int
- var memory: Long
- var slicedWaitTime: Long
- var runtimePartialSliceRemainder: Long
-
- BufferedReader(FileReader(file)).use { reader ->
- reader.lineSequence()
- .filter { line ->
- // Ignore comments in the trace
- !line.startsWith(";") && line.isNotBlank()
- }
- .forEach { line ->
- val values = line.trim().split("\\s+".toRegex())
-
- jobNumber = values[jobNumberCol].trim().toLong()
- submitTime = values[submitTimeCol].trim().toLong()
- waitTime = values[waitTimeCol].trim().toLong()
- runTime = values[runTimeCol].trim().toLong()
- cores = values[numAllocatedCoresCol].trim().toInt()
- memory = values[requestedMemoryCol].trim().toLong()
-
- if (maxNumCores != -1 && cores > maxNumCores) {
- println("Skipped a task due to processor count ($cores > $maxNumCores).")
- return@forEach
- }
-
- if (memory == -1L) {
- memory = 1000L * cores // assume 1GB of memory per processor if not specified
- } else {
- memory /= 1000 // convert KB to MB
- }
-
- val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>()
-
- // Insert waiting time slices
-
- // We ignore wait time remainders under one
- slicedWaitTime = 0L
- if (waitTime >= sliceDuration) {
- for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) {
- flopsHistory.add(
- SimTraceWorkload.Fragment(
- tick,
- sliceDuration * 1000,
- 0.0,
- cores
- )
- )
- slicedWaitTime += sliceDuration
- }
- }
-
- // Insert run time slices
-
- runtimePartialSliceRemainder = runTime % sliceDuration
-
- for (
- tick in (submitTime + slicedWaitTime)
- until (submitTime + slicedWaitTime + runTime - sliceDuration)
- step sliceDuration
- ) {
- flopsHistory.add(
- SimTraceWorkload.Fragment(
- tick,
- sliceDuration * 1000L,
- 1.0,
- cores
- )
- )
- }
-
- if (runtimePartialSliceRemainder > 0) {
- flopsHistory.add(
- SimTraceWorkload.Fragment(
- submitTime + slicedWaitTime + runTime,
- sliceDuration,
- runtimePartialSliceRemainder / sliceDuration.toDouble(),
- cores
- )
- )
- }
-
- val uuid = UUID(0L, jobNumber)
- val workload = SimTraceWorkload(flopsHistory.asSequence())
- entries[jobNumber] = TraceEntry(
- uuid,
- jobNumber.toString(),
- submitTime,
- workload,
- mapOf(
- "cores" to cores,
- "required-memory" to memory,
- "workload" to workload
- )
- )
- }
- }
-
- // Create the entry iterator
- iterator = entries.values.sortedBy { it.start }.iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
deleted file mode 100644
index dde1b340..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.wtf
-
-import org.apache.avro.generic.GenericRecord
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.format.util.LocalParquetReader
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.workflow.api.Job
-import org.opendc.workflow.api.Task
-import org.opendc.workflow.api.WORKFLOW_TASK_CORES
-import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
-import java.io.File
-import java.nio.file.Path
-import java.util.UUID
-import kotlin.math.min
-
-/**
- * A [TraceReader] for the Workflow Trace Format (WTF). See the Workflow Trace Archive
- * (https://wta.atlarge-research.com/) for more information about the format.
- *
- * @param path The path to the trace.
- */
-public class WtfTraceReader(path: Path) : TraceReader<Job> {
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<Job>>
-
- /**
- * Construct a [TraceReader] from the specified [path].
- *
- * @param path The path to the trace.
- */
- public constructor(path: File) : this(path.toPath())
-
- /**
- * Initialize the reader.
- */
- init {
- val workflows = mutableMapOf<Long, Job>()
- val starts = mutableMapOf<Long, Long>()
- val tasks = mutableMapOf<Long, Task>()
- val taskDependencies = mutableMapOf<Task, List<Long>>()
-
- LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")).use { reader ->
- while (true) {
- val nextRecord = reader.read() ?: break
-
- val workflowId = nextRecord.get("workflow_id") as Long
- val taskId = nextRecord.get("id") as Long
- val submitTime = nextRecord.get("ts_submit") as Long
- val runtime = nextRecord.get("runtime") as Long
- val cores = (nextRecord.get("resource_amount_requested") as Double).toInt()
-
- @Suppress("UNCHECKED_CAST")
- val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map {
- it.get("item") as Long
- }
-
- val flops: Long = 4100 * (runtime / 1000) * cores
-
- val workflow = workflows.getOrPut(workflowId) {
- Job(UUID(0L, workflowId), "<unnamed>", HashSet())
- }
- val workload = SimFlopsWorkload(flops)
- val task = Task(
- UUID(0L, taskId),
- "<unnamed>",
- HashSet(),
- mapOf(
- "workload" to workload,
- WORKFLOW_TASK_CORES to cores,
- WORKFLOW_TASK_DEADLINE to runtime
- )
- )
-
- starts.merge(workflowId, submitTime, ::min)
- (workflow.tasks as MutableSet<Task>).add(task)
- tasks[taskId] = task
- taskDependencies[task] = dependencies
- }
- }
-
- // Fix dependencies and dependents for all tasks
- taskDependencies.forEach { (task, dependencies) ->
- (task.dependencies as MutableSet<Task>).addAll(
- dependencies.map { taskId ->
- tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found")
- }
- )
- }
-
- // Create the entry iterator
- iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) }
- .sortedBy { it.start }
- .iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<Job> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt
deleted file mode 100644
index 92319ace..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.util
-
-import org.apache.parquet.io.InputFile
-import org.apache.parquet.io.SeekableInputStream
-import java.io.EOFException
-import java.io.File
-import java.nio.ByteBuffer
-import java.nio.channels.FileChannel
-import java.nio.file.Path
-import java.nio.file.StandardOpenOption
-
-/**
- * An [InputFile] on the local filesystem.
- */
-public class LocalInputFile(private val path: Path) : InputFile {
- /**
- * The [FileChannel] used for accessing the input path.
- */
- private val channel = FileChannel.open(path, StandardOpenOption.READ)
-
- /**
- * Construct a [LocalInputFile] for the specified [file].
- */
- public constructor(file: File) : this(file.toPath())
-
- override fun getLength(): Long = channel.size()
-
- override fun newStream(): SeekableInputStream = object : SeekableInputStream() {
- override fun read(buf: ByteBuffer): Int {
- return channel.read(buf)
- }
-
- override fun read(): Int {
- val single = ByteBuffer.allocate(1)
- var read: Int
-
- // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte
- do {
- read = channel.read(single)
- } while (read == 0)
-
- return if (read == -1) {
- read
- } else {
- single.get(0).toInt() and 0xff
- }
- }
-
- override fun getPos(): Long {
- return channel.position()
- }
-
- override fun seek(newPos: Long) {
- channel.position(newPos)
- }
-
- override fun readFully(bytes: ByteArray) {
- readFully(ByteBuffer.wrap(bytes))
- }
-
- override fun readFully(bytes: ByteArray, start: Int, len: Int) {
- readFully(ByteBuffer.wrap(bytes, start, len))
- }
-
- override fun readFully(buf: ByteBuffer) {
- var remainder = buf.remaining()
- while (remainder > 0) {
- val read = channel.read(buf)
- remainder -= read
-
- if (read == -1 && remainder > 0) {
- throw EOFException()
- }
- }
- }
-
- override fun close() {
- channel.close()
- }
-
- override fun toString(): String = "NioSeekableInputStream"
- }
-
- override fun toString(): String = "LocalInputFile[path=$path]"
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt
deleted file mode 100644
index 657bca5a..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.util
-
-import org.apache.parquet.io.OutputFile
-import org.apache.parquet.io.PositionOutputStream
-import java.io.File
-import java.io.OutputStream
-import java.nio.file.Files
-import java.nio.file.Path
-import java.nio.file.StandardOpenOption
-
-/**
- * An [OutputFile] on the local filesystem.
- */
-public class LocalOutputFile(private val path: Path) : OutputFile {
- /**
- * Construct a [LocalOutputFile] from the specified [file]
- */
- public constructor(file: File) : this(file.toPath())
-
- override fun create(blockSizeHint: Long): PositionOutputStream {
- val output = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)
- return NioPositionOutputStream(output)
- }
-
- override fun createOrOverwrite(blockSizeHint: Long): PositionOutputStream {
- val output = Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)
- return NioPositionOutputStream(output)
- }
-
- override fun supportsBlockSize(): Boolean = false
-
- override fun defaultBlockSize(): Long =
- throw UnsupportedOperationException("Local filesystem does not have default block size")
-
- override fun getPath(): String = path.toString()
-
- /**
- * Implementation of [PositionOutputStream] for an [OutputStream].
- */
- private class NioPositionOutputStream(private val output: OutputStream) : PositionOutputStream() {
- /**
- * The current position in the file.
- */
- private var _pos = 0L
-
- override fun getPos(): Long = _pos
-
- override fun write(b: Int) {
- output.write(b)
- _pos++
- }
-
- override fun write(b: ByteArray) {
- output.write(b)
- _pos += b.size
- }
-
- override fun write(b: ByteArray, off: Int, len: Int) {
- output.write(b, off, len)
- _pos += len
- }
-
- override fun flush() {
- output.flush()
- }
-
- override fun close() {
- output.close()
- }
-
- override fun toString(): String = "NioPositionOutputStream[output=$output]"
- }
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt
deleted file mode 100644
index 5083f3e1..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.util
-
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.hadoop.ParquetReader
-import org.apache.parquet.io.InputFile
-import java.io.File
-import java.io.IOException
-import java.nio.file.Files
-import java.nio.file.Path
-import kotlin.io.path.isDirectory
-
-/**
- * A helper class to read Parquet files.
- *
- * @param path The path to the Parquet file or directory to read.
- */
-public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
- /**
- * The input files to process.
- */
- private val filesIterator = if (path.isDirectory())
- Files.list(path)
- .filter { !it.isDirectory() }
- .sorted()
- .map { LocalInputFile(it) }
- .iterator()
- else
- listOf(LocalInputFile(path)).iterator()
-
- /**
- * The Parquet reader to use.
- */
- private var reader: ParquetReader<T>? = null
-
- /**
- * Construct a [LocalParquetReader] for the specified [file].
- */
- public constructor(file: File) : this(file.toPath())
-
- /**
- * Read a single entry in the Parquet file.
- */
- public fun read(): T? {
- return try {
- val next = reader?.read()
- if (next != null) {
- next
- } else {
- initReader()
-
- if (reader == null)
- null
- else
- read()
- }
- } catch (e: InterruptedException) {
- throw IOException(e)
- }
- }
-
- /**
- * Close the Parquet reader.
- */
- override fun close() {
- reader?.close()
- }
-
- /**
- * Initialize the next reader.
- */
- private fun initReader() {
- reader?.close()
-
- this.reader = if (filesIterator.hasNext()) {
- createReader(filesIterator.next())
- } else {
- null
- }
- }
-
- /**
- * Create a Parquet reader for the specified file.
- */
- private fun createReader(input: InputFile): ParquetReader<T> {
- return AvroParquetReader
- .builder<T>(input)
- .disableCompatibility()
- .build()
- }
-}