diff options
Diffstat (limited to 'opendc-format/src/main')
14 files changed, 0 insertions, 1320 deletions
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt deleted file mode 100644 index 97d6f239..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment - -import java.io.Closeable - -/** - * An interface for reading descriptions of topology environments into memory. - */ -public interface EnvironmentReader : Closeable { - /** - * Read the environment into a list. - */ - public fun read(): List<MachineDef> -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt deleted file mode 100644 index f65c4880..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment - -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.power.PowerModel -import java.util.* - -public data class MachineDef( - val uid: UUID, - val name: String, - val meta: Map<String, Any>, - val model: MachineModel, - val powerModel: PowerModel -) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt deleted file mode 100644 index c313467f..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc18 - -import com.fasterxml.jackson.annotation.JsonSubTypes -import com.fasterxml.jackson.annotation.JsonTypeInfo - -/** - * A topology setup. - * - * @property name The name of the setup. - * @property rooms The rooms in the topology. - */ -internal data class Setup(val name: String, val rooms: List<Room>) - -/** - * A room in a topology. - * - * @property type The type of room in the topology. - * @property objects The objects in the room. - */ -internal data class Room(val type: String, val objects: List<RoomObject>) - -/** - * An object in a [Room]. - * - * @property type The type of the room object. - */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") -@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) -internal sealed class RoomObject(val type: String) { - /** - * A rack in a server room. - * - * @property machines The machines in the rack. - */ - internal data class Rack(val machines: List<Machine>) : RoomObject("RACK") -} - -/** - * A machine in the setup that consists of the specified CPU's represented as - * integer identifiers and ethernet speed. - * - * @property cpus The CPUs in the machine represented as integer identifiers. - */ -internal data class Machine(val cpus: List<Int>) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt deleted file mode 100644 index 7780a98e..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc18 - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.ConstantPowerModel -import java.io.InputStream -import java.util.* - -/** - * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Topology - * Schedulers". - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. - */ -public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { - /** - * The environment that was read from the file. - */ - private val setup: Setup = mapper.readValue(input) - - /** - * Read the environment. - */ - public override fun read(): List<MachineDef> { - var counter = 0 - return setup.rooms.flatMap { room -> - room.objects.flatMap { roomObject -> - when (roomObject) { - is RoomObject.Rack -> { - roomObject.machines.map { machine -> - val cores = machine.cpus.flatMap { id -> - when (id) { - 1 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) - List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } - } - 2 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) - List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } - } - else -> throw IllegalArgumentException("The cpu id $id is not recognized") - } - } - MachineDef( - UUID(0L, counter++.toLong()), - "node-$counter", - emptyMap(), - MachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))), - ConstantPowerModel(0.0) - ) - } - } - } - } - } - } - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt deleted file mode 100644 index 3ce79d69..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace - -import java.util.UUID - -/** - * An entry in a workload trace. - * - * @param uid The unique identifier of the entry. - * @param name The name of the entry. - * @param start The start time of the workload. - * @param workload The workload of the entry. - * @param meta The meta-data associated with the workload. - */ -public data class TraceEntry<out T>( - val uid: UUID, - val name: String, - val start: Long, - val workload: T, - val meta: Map<String, Any> -) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt deleted file mode 100644 index 797a88d5..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace - -/** - * An interface for reading workloads into memory. - * - * This interface must guarantee that the entries are delivered in order of submission time. - * - * @param T The shape of the workloads supported by this reader. - */ -public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt deleted file mode 100644 index ff6cdd02..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.bitbrains - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.databind.MappingIterator -import com.fasterxml.jackson.dataformat.csv.CsvMapper -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import java.io.InputStream - -/** - * A trace reader that enables the user to read Bitbrains specific trace data. - */ -public class BitbrainsRawTraceReader(input: InputStream) : Iterator<BitbrainsRawTraceReader.Entry>, AutoCloseable { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = CsvSchema.builder() - .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .setUseHeader(true) - .setColumnSeparator(';') - .build() - - /** - * The mapping iterator to use. - */ - private val iterator: MappingIterator<Entry> = CsvMapper().readerFor(Entry::class.java).with(schema) - .readValues(input) - - override fun hasNext(): Boolean { - return iterator.hasNext() - } - - override fun next(): Entry { - return iterator.next() - } - - override fun close() { - iterator.close() - } - - /** - * A single entry in the trace. - */ - public data class Entry( - @JsonProperty("Timestamp [ms]") - val timestamp: Long, - @JsonProperty("CPU cores") - val cpuCores: Int, - @JsonProperty("CPU capacity provisioned [MHZ]") - val cpuCapacity: Double, - @JsonProperty("CPU usage [MHZ]") - val cpuUsage: Double, - @JsonProperty("CPU usage [%]") - val cpuUsagePct: Double, - @JsonProperty("Memory capacity provisioned [KB]") - val memCapacity: Double, - @JsonProperty("Memory usage [KB]") - val memUsage: Double, - @JsonProperty("Disk read throughput [KB/s]") - val diskRead: Double, - @JsonProperty("Disk write throughput [KB/s]") - val diskWrite: Double, - @JsonProperty("Network received throughput [KB/s]") - val netReceived: Double, - @JsonProperty("Network transmitted throughput [KB/s]") - val netTransmitted: Double - ) -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt deleted file mode 100644 index 9e4876df..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.bitbrains - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.File -import java.io.FileInputStream -import java.util.* -import kotlin.math.max -import kotlin.math.min - -/** - * A [TraceReader] for the public VM workload trace format. - * - * @param traceDirectory The directory of the traces. - */ -public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkload> { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>() - val traceInterval = 5 * 60 * 1000L - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" } - .forEach { vmFile -> - val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>() - var vmId = -1L - var maxCores = Int.MIN_VALUE - var requiredMemory = Long.MIN_VALUE - var startTime = Long.MAX_VALUE - var lastTimestamp = Long.MIN_VALUE - - BitbrainsRawTraceReader(FileInputStream(vmFile)).use { reader -> - reader.forEach { entry -> - val timestamp = entry.timestamp * 1000L - val cpuUsage = entry.cpuUsage - vmId = vmFile.nameWithoutExtension.trim().toLong() - val cores = entry.cpuCores - maxCores = max(maxCores, cores) - requiredMemory = max(requiredMemory, (entry.memCapacity / 1000).toLong()) - - if (lastTimestamp < 0) { - lastTimestamp = timestamp - 5 * 60 * 1000L - startTime = min(startTime, lastTimestamp) - } - - if (flopsHistory.isEmpty()) { - flopsHistory.add(SimTraceWorkload.Fragment(lastTimestamp, traceInterval, cpuUsage, cores)) - } else { - val last = flopsHistory.last() - val duration = timestamp - lastTimestamp - // Perform run-length encoding - if (duration == 0L || last.usage == cpuUsage) { - flopsHistory[flopsHistory.size - 1] = last.copy(duration = last.duration + duration) - } else { - flopsHistory.add( - SimTraceWorkload.Fragment( - lastTimestamp, - duration, - cpuUsage, - cores - ) - ) - } - } - - lastTimestamp = timestamp - } - } - - val uuid = UUID(0L, vmId) - - val workload = SimTraceWorkload(flopsHistory.asSequence()) - entries[vmId] = TraceEntry( - uuid, - vmId.toString(), - startTime, - workload, - mapOf( - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.start }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt deleted file mode 100644 index e68afeb7..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.gwf - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task -import org.opendc.workflow.api.WORKFLOW_TASK_CORES -import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE -import java.io.BufferedReader -import java.io.File -import java.io.InputStream -import java.util.* -import kotlin.collections.HashSet -import kotlin.collections.Iterator -import kotlin.collections.List -import kotlin.collections.MutableSet -import kotlin.collections.component1 -import kotlin.collections.component2 -import kotlin.collections.filter -import kotlin.collections.forEach -import kotlin.collections.getOrPut -import kotlin.collections.map -import kotlin.collections.mapIndexed -import kotlin.collections.mapOf -import kotlin.collections.mutableMapOf -import kotlin.collections.set -import kotlin.collections.sortedBy -import kotlin.collections.toMap -import kotlin.math.max -import kotlin.math.min - -/** - * A [TraceReader] for the Grid Workload Format. See the Grid Workloads Archive (http://gwa.ewi.tudelft.nl/) for more - * information about the format. - * - * Be aware that in the Grid Workload Format, workflows are not required to be ordered by submission time and therefore - * this reader needs to read the whole trace into memory before an entry can be read. Consider converting the trace to a - * different format for better performance. - * - * @param reader The buffered reader to read the trace with. - */ -public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<Job>> - - /** - * Create a [GwfTraceReader] instance from the specified [File]. - * - * @param file The file to read from. - */ - public constructor(file: File) : this(file.bufferedReader()) - - /** - * Create a [GwfTraceReader] instance from the specified [InputStream]. - * - * @param input The input stream to read from. - */ - public constructor(input: InputStream) : this(input.bufferedReader()) - - /** - * Initialize the reader. - */ - init { - val workflows = mutableMapOf<Long, Job>() - val starts = mutableMapOf<Long, Long>() - val tasks = mutableMapOf<Long, Task>() - val taskDependencies = mutableMapOf<Task, List<Long>>() - - var workflowIdCol = 0 - var taskIdCol = 0 - var submitTimeCol = 0 - var runtimeCol = 0 - var coreCol = 0 - var dependencyCol = 0 - - try { - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith("#") && line.isNotBlank() - } - .forEachIndexed { idx, line -> - val values = line.split(",") - - // Parse GWF header - if (idx == 0) { - val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() - workflowIdCol = header["WorkflowID"]!! - taskIdCol = header["JobID"]!! - submitTimeCol = header["SubmitTime"]!! - runtimeCol = header["RunTime"]!! - coreCol = header["NProcs"]!! - dependencyCol = header["Dependencies"]!! - return@forEachIndexed - } - - val workflowId = values[workflowIdCol].trim().toLong() - val taskId = values[taskIdCol].trim().toLong() - val submitTime = values[submitTimeCol].trim().toLong() * 1000 // ms - val runtime = max(0, values[runtimeCol].trim().toLong()) // s - val cores = values[coreCol].trim().toInt() - val dependencies = values[dependencyCol].split(" ") - .filter { it.isNotEmpty() } - .map { it.trim().toLong() } - - val flops: Long = 4000 * runtime * cores - - val workflow = workflows.getOrPut(workflowId) { - Job(UUID(0L, workflowId), "<unnamed>", HashSet()) - } - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, taskId), - "<unnamed>", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to cores, - WORKFLOW_TASK_DEADLINE to (runtime * 1000) - ), - ) - starts.merge(workflowId, submitTime, ::min) - (workflow.tasks as MutableSet<Task>).add(task) - tasks[taskId] = task - taskDependencies[task] = dependencies - } - } finally { - reader.close() - } - - // Fix dependencies and dependents for all tasks - taskDependencies.forEach { (task, dependencies) -> - (task.dependencies as MutableSet<Task>).addAll( - dependencies.map { taskId -> - tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") - } - ) - } - - // Create the entry iterator - iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } - .sortedBy { it.start } - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<Job> = iterator.next() - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt deleted file mode 100644 index bda392a9..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.swf - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.* - -/** - * A [TraceReader] for reading SWF traces into VM-modeled workloads. - * - * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html - * - * @param file The trace file. - */ -public class SwfTraceReader( - file: File, - maxNumCores: Int = -1 -) : TraceReader<SimWorkload> { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>() - - val jobNumberCol = 0 - val submitTimeCol = 1 // seconds (begin of trace is 0) - val waitTimeCol = 2 // seconds - val runTimeCol = 3 // seconds - val numAllocatedCoresCol = 4 // We assume that single-core processors were used at the time - val requestedMemoryCol = 9 // KB per processor/core (-1 if not specified) - - val sliceDuration = 5 * 60L - - var jobNumber: Long - var submitTime: Long - var waitTime: Long - var runTime: Long - var cores: Int - var memory: Long - var slicedWaitTime: Long - var runtimePartialSliceRemainder: Long - - BufferedReader(FileReader(file)).use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith(";") && line.isNotBlank() - } - .forEach { line -> - val values = line.trim().split("\\s+".toRegex()) - - jobNumber = values[jobNumberCol].trim().toLong() - submitTime = values[submitTimeCol].trim().toLong() - waitTime = values[waitTimeCol].trim().toLong() - runTime = values[runTimeCol].trim().toLong() - cores = values[numAllocatedCoresCol].trim().toInt() - memory = values[requestedMemoryCol].trim().toLong() - - if (maxNumCores != -1 && cores > maxNumCores) { - println("Skipped a task due to processor count ($cores > $maxNumCores).") - return@forEach - } - - if (memory == -1L) { - memory = 1000L * cores // assume 1GB of memory per processor if not specified - } else { - memory /= 1000 // convert KB to MB - } - - val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>() - - // Insert waiting time slices - - // We ignore wait time remainders under one - slicedWaitTime = 0L - if (waitTime >= sliceDuration) { - for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) { - flopsHistory.add( - SimTraceWorkload.Fragment( - tick, - sliceDuration * 1000, - 0.0, - cores - ) - ) - slicedWaitTime += sliceDuration - } - } - - // Insert run time slices - - runtimePartialSliceRemainder = runTime % sliceDuration - - for ( - tick in (submitTime + slicedWaitTime) - until (submitTime + slicedWaitTime + runTime - sliceDuration) - step sliceDuration - ) { - flopsHistory.add( - SimTraceWorkload.Fragment( - tick, - sliceDuration * 1000L, - 1.0, - cores - ) - ) - } - - if (runtimePartialSliceRemainder > 0) { - flopsHistory.add( - SimTraceWorkload.Fragment( - submitTime + slicedWaitTime + runTime, - sliceDuration, - runtimePartialSliceRemainder / sliceDuration.toDouble(), - cores - ) - ) - } - - val uuid = UUID(0L, jobNumber) - val workload = SimTraceWorkload(flopsHistory.asSequence()) - entries[jobNumber] = TraceEntry( - uuid, - jobNumber.toString(), - submitTime, - workload, - mapOf( - "cores" to cores, - "required-memory" to memory, - "workload" to workload - ) - ) - } - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.start }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt deleted file mode 100644 index dde1b340..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.wtf - -import org.apache.avro.generic.GenericRecord -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.format.util.LocalParquetReader -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task -import org.opendc.workflow.api.WORKFLOW_TASK_CORES -import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE -import java.io.File -import java.nio.file.Path -import java.util.UUID -import kotlin.math.min - -/** - * A [TraceReader] for the Workflow Trace Format (WTF). See the Workflow Trace Archive - * (https://wta.atlarge-research.com/) for more information about the format. - * - * @param path The path to the trace. - */ -public class WtfTraceReader(path: Path) : TraceReader<Job> { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<Job>> - - /** - * Construct a [TraceReader] from the specified [path]. - * - * @param path The path to the trace. - */ - public constructor(path: File) : this(path.toPath()) - - /** - * Initialize the reader. - */ - init { - val workflows = mutableMapOf<Long, Job>() - val starts = mutableMapOf<Long, Long>() - val tasks = mutableMapOf<Long, Task>() - val taskDependencies = mutableMapOf<Task, List<Long>>() - - LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")).use { reader -> - while (true) { - val nextRecord = reader.read() ?: break - - val workflowId = nextRecord.get("workflow_id") as Long - val taskId = nextRecord.get("id") as Long - val submitTime = nextRecord.get("ts_submit") as Long - val runtime = nextRecord.get("runtime") as Long - val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - - @Suppress("UNCHECKED_CAST") - val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map { - it.get("item") as Long - } - - val flops: Long = 4100 * (runtime / 1000) * cores - - val workflow = workflows.getOrPut(workflowId) { - Job(UUID(0L, workflowId), "<unnamed>", HashSet()) - } - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, taskId), - "<unnamed>", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to cores, - WORKFLOW_TASK_DEADLINE to runtime - ) - ) - - starts.merge(workflowId, submitTime, ::min) - (workflow.tasks as MutableSet<Task>).add(task) - tasks[taskId] = task - taskDependencies[task] = dependencies - } - } - - // Fix dependencies and dependents for all tasks - taskDependencies.forEach { (task, dependencies) -> - (task.dependencies as MutableSet<Task>).addAll( - dependencies.map { taskId -> - tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") - } - ) - } - - // Create the entry iterator - iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } - .sortedBy { it.start } - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<Job> = iterator.next() - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt deleted file mode 100644 index 92319ace..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.io.InputFile -import org.apache.parquet.io.SeekableInputStream -import java.io.EOFException -import java.io.File -import java.nio.ByteBuffer -import java.nio.channels.FileChannel -import java.nio.file.Path -import java.nio.file.StandardOpenOption - -/** - * An [InputFile] on the local filesystem. - */ -public class LocalInputFile(private val path: Path) : InputFile { - /** - * The [FileChannel] used for accessing the input path. - */ - private val channel = FileChannel.open(path, StandardOpenOption.READ) - - /** - * Construct a [LocalInputFile] for the specified [file]. - */ - public constructor(file: File) : this(file.toPath()) - - override fun getLength(): Long = channel.size() - - override fun newStream(): SeekableInputStream = object : SeekableInputStream() { - override fun read(buf: ByteBuffer): Int { - return channel.read(buf) - } - - override fun read(): Int { - val single = ByteBuffer.allocate(1) - var read: Int - - // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte - do { - read = channel.read(single) - } while (read == 0) - - return if (read == -1) { - read - } else { - single.get(0).toInt() and 0xff - } - } - - override fun getPos(): Long { - return channel.position() - } - - override fun seek(newPos: Long) { - channel.position(newPos) - } - - override fun readFully(bytes: ByteArray) { - readFully(ByteBuffer.wrap(bytes)) - } - - override fun readFully(bytes: ByteArray, start: Int, len: Int) { - readFully(ByteBuffer.wrap(bytes, start, len)) - } - - override fun readFully(buf: ByteBuffer) { - var remainder = buf.remaining() - while (remainder > 0) { - val read = channel.read(buf) - remainder -= read - - if (read == -1 && remainder > 0) { - throw EOFException() - } - } - } - - override fun close() { - channel.close() - } - - override fun toString(): String = "NioSeekableInputStream" - } - - override fun toString(): String = "LocalInputFile[path=$path]" -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt deleted file mode 100644 index 657bca5a..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.io.OutputFile -import org.apache.parquet.io.PositionOutputStream -import java.io.File -import java.io.OutputStream -import java.nio.file.Files -import java.nio.file.Path -import java.nio.file.StandardOpenOption - -/** - * An [OutputFile] on the local filesystem. - */ -public class LocalOutputFile(private val path: Path) : OutputFile { - /** - * Construct a [LocalOutputFile] from the specified [file] - */ - public constructor(file: File) : this(file.toPath()) - - override fun create(blockSizeHint: Long): PositionOutputStream { - val output = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE) - return NioPositionOutputStream(output) - } - - override fun createOrOverwrite(blockSizeHint: Long): PositionOutputStream { - val output = Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING) - return NioPositionOutputStream(output) - } - - override fun supportsBlockSize(): Boolean = false - - override fun defaultBlockSize(): Long = - throw UnsupportedOperationException("Local filesystem does not have default block size") - - override fun getPath(): String = path.toString() - - /** - * Implementation of [PositionOutputStream] for an [OutputStream]. - */ - private class NioPositionOutputStream(private val output: OutputStream) : PositionOutputStream() { - /** - * The current position in the file. - */ - private var _pos = 0L - - override fun getPos(): Long = _pos - - override fun write(b: Int) { - output.write(b) - _pos++ - } - - override fun write(b: ByteArray) { - output.write(b) - _pos += b.size - } - - override fun write(b: ByteArray, off: Int, len: Int) { - output.write(b, off, len) - _pos += len - } - - override fun flush() { - output.flush() - } - - override fun close() { - output.close() - } - - override fun toString(): String = "NioPositionOutputStream[output=$output]" - } -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt deleted file mode 100644 index 5083f3e1..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.hadoop.ParquetReader -import org.apache.parquet.io.InputFile -import java.io.File -import java.io.IOException -import java.nio.file.Files -import java.nio.file.Path -import kotlin.io.path.isDirectory - -/** - * A helper class to read Parquet files. - * - * @param path The path to the Parquet file or directory to read. - */ -public class LocalParquetReader<out T>(path: Path) : AutoCloseable { - /** - * The input files to process. - */ - private val filesIterator = if (path.isDirectory()) - Files.list(path) - .filter { !it.isDirectory() } - .sorted() - .map { LocalInputFile(it) } - .iterator() - else - listOf(LocalInputFile(path)).iterator() - - /** - * The Parquet reader to use. - */ - private var reader: ParquetReader<T>? = null - - /** - * Construct a [LocalParquetReader] for the specified [file]. - */ - public constructor(file: File) : this(file.toPath()) - - /** - * Read a single entry in the Parquet file. - */ - public fun read(): T? { - return try { - val next = reader?.read() - if (next != null) { - next - } else { - initReader() - - if (reader == null) - null - else - read() - } - } catch (e: InterruptedException) { - throw IOException(e) - } - } - - /** - * Close the Parquet reader. - */ - override fun close() { - reader?.close() - } - - /** - * Initialize the next reader. - */ - private fun initReader() { - reader?.close() - - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null - } - } - - /** - * Create a Parquet reader for the specified file. - */ - private fun createReader(input: InputFile): ParquetReader<T> { - return AvroParquetReader - .builder<T>(input) - .disableCompatibility() - .build() - } -} |
