diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-02 11:50:43 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-02 11:50:43 +0200 |
| commit | 05f80bd9fb7caf765e3ebbb70d48d0d5e185bd42 (patch) | |
| tree | 5fa9501621ad327028c2f2e12c9c367f44f6aebe | |
| parent | 99f391d11db57c3db3f326958de8f66502969cdb (diff) | |
| parent | 5935531137a22fdb920921580d491f86adec65c9 (diff) | |
merge: Add generic trace reading library
This pull request adds a generic trace reading library to OpenDC.
The library has been designed to support a wide range of trace formats
and uses a streaming approach to improve performance of reading large traces.
* Add trace reading API
* Implement API for GWF format
* Implement API for SWF format
* Implement API for WTF format
* Implement API for Bitbrains format
* Implement API for Bitbrains Parquet format
**Breaking API Changes**
* `opendc-format` has been removed in favour of `opendc-trace-*`
93 files changed, 4039 insertions, 1268 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 53643aba..65cebe1b 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -31,7 +31,8 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) - implementation(projects.opendcFormat) + implementation(projects.opendcTrace.opendcTraceParquet) + implementation(projects.opendcTrace.opendcTraceBitbrains) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcSimulator.opendcSimulatorFailures) @@ -43,6 +44,9 @@ dependencies { implementation(libs.config) implementation(libs.progressbar) implementation(libs.clikt) + implementation(libs.jackson.module.kotlin) { + exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") + } implementation(libs.parquet) testImplementation(libs.log4j.slf4j) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 7f428b2a..46e11056 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -45,10 +45,10 @@ import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher import org.opendc.compute.service.scheduler.weights.RamWeigher import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.simulator.SimHost +import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.trace.TraceReader +import org.opendc.experiments.capelin.trace.TraceReader import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.power.SimplePowerDriver diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt index d73d14f5..babd8ada 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt @@ -22,8 +22,6 @@ package org.opendc.experiments.capelin.env -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt index 97d6f239..a968b043 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.format.environment +package org.opendc.experiments.capelin.env import java.io.Closeable diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt new file mode 100644 index 00000000..b0c0318f --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.env + +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.power.PowerModel +import java.util.* + +/** + * A definition of a machine in a cluster. + */ +public data class MachineDef( + val uid: UUID, + val name: String, + val meta: Map<String, Any>, + val model: MachineModel, + val powerModel: PowerModel +) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt index d8f7ff75..897a6692 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt @@ -28,7 +28,7 @@ import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.experiments.capelin.telemetry.Event -import org.opendc.format.util.LocalOutputFile +import org.opendc.trace.util.parquet.LocalOutputFile import java.io.Closeable import java.io.File import java.util.concurrent.ArrayBlockingQueue diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt index 0f49ecd2..0bf4ada6 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt @@ -24,8 +24,6 @@ package org.opendc.experiments.capelin.trace import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.Workload -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimWorkload /** diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt index 16ad6816..fa4e9ed8 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -22,12 +22,10 @@ package org.opendc.experiments.capelin.trace -import org.apache.avro.generic.GenericData -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.format.util.LocalParquetReader +import org.opendc.experiments.capelin.trace.bp.BPTraceFormat import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.trace.* import java.io.File import java.util.UUID @@ -38,26 +36,29 @@ import java.util.UUID */ class RawParquetTraceReader(private val path: File) { /** + * The [Trace] that represents this trace. + */ + private val trace = BPTraceFormat().open(path.toURI().toURL()) + + /** * Read the fragments into memory. */ - private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> { - val reader = LocalParquetReader<GenericData.Record>(File(path, "trace.parquet")) + private fun parseFragments(): Map<String, List<SimTraceWorkload.Fragment>> { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>() return try { - while (true) { - val record = reader.read() ?: break - - val id = record["id"].toString() - val time = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double + while (reader.nextRow()) { + val id = reader.get(RESOURCE_STATE_ID) + val time = reader.get(RESOURCE_STATE_TIMESTAMP) + val duration = reader.get(RESOURCE_STATE_DURATION) + val cores = reader.getInt(RESOURCE_STATE_NCPUS) + val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) val fragment = SimTraceWorkload.Fragment( - time, - duration, + time.toEpochMilli(), + duration.toMillis(), cpuUsage, cores ) @@ -74,25 +75,24 @@ class RawParquetTraceReader(private val path: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> { - val metaReader = LocalParquetReader<GenericData.Record>(File(path, "meta.parquet")) + private fun parseMeta(fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() var counter = 0 val entries = mutableListOf<TraceEntry<SimWorkload>>() return try { - while (true) { - val record = metaReader.read() ?: break + while (reader.nextRow()) { - val id = record["id"].toString() + val id = reader.get(RESOURCE_ID) if (!fragments.containsKey(id)) { continue } - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long + val submissionTime = reader.get(RESOURCE_START_TIME) + val endTime = reader.get(RESOURCE_END_TIME) + val maxCores = reader.getInt(RESOURCE_NCPUS) + val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) val vmFragments = fragments.getValue(id).asSequence() @@ -100,13 +100,13 @@ class RawParquetTraceReader(private val path: File) { val workload = SimTraceWorkload(vmFragments) entries.add( TraceEntry( - uid, id, submissionTime, workload, + uid, id, submissionTime.toEpochMilli(), workload, mapOf( - "submit-time" to submissionTime, - "end-time" to endTime, + "submit-time" to submissionTime.toEpochMilli(), + "end-time" to endTime.toEpochMilli(), "total-load" to totalLoad, "cores" to maxCores, - "required-memory" to requiredMemory, + "required-memory" to requiredMemory.toLong(), "workload" to workload ) ) @@ -118,7 +118,7 @@ class RawParquetTraceReader(private val path: File) { e.printStackTrace() throw e } finally { - metaReader.close() + reader.close() } } @@ -128,8 +128,8 @@ class RawParquetTraceReader(private val path: File) { private val entries: List<TraceEntry<SimWorkload>> init { - val fragments = parseFragments(path) - entries = parseMeta(path, fragments) + val fragments = parseFragments() + entries = parseMeta(fragments) } /** diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt index 35f4c5b8..61e4cab5 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt @@ -30,11 +30,9 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.format.util.LocalInputFile import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.trace.util.parquet.LocalInputFile import java.io.File import java.io.Serializable import java.util.SortedSet @@ -187,7 +185,7 @@ class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = e assert(uid !in takenIds) takenIds += uid - logger.info("Processing VM $id") + logger.info { "Processing VM $id" } val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>() val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index d7daa35b..a021de8d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -36,9 +36,11 @@ import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.format.trace.bitbrains.BitbrainsTraceReader -import org.opendc.format.util.LocalOutputFile -import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.experiments.capelin.trace.sv.SvTraceFormat +import org.opendc.trace.* +import org.opendc.trace.bitbrains.BitbrainsTraceFormat +import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.LocalOutputFile import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -156,220 +158,101 @@ sealed class TraceConversion(name: String) : OptionGroup(name) { ): MutableList<Fragment> } -class SolvinityConversion : TraceConversion("Solvinity") { - private val clusters by option() - .split(",") - - private val vmPlacements by option("--vm-placements", help = "file containing the VM placements") - .file(canBeDir = false) - .convert { VmPlacementReader(it.inputStream()).use { reader -> reader.read() } } - .required() +/** + * A [TraceConversion] that uses the Trace API to perform the conversion. + */ +abstract class AbstractConversion(name: String) : TraceConversion(name) { + abstract val format: TraceFormat override fun read( traceDirectory: File, metaSchema: Schema, metaWriter: ParquetWriter<GenericData.Record> ): MutableList<Fragment> { - val clusters = clusters?.toSet() ?: emptySet() - val timestampCol = 0 - val cpuUsageCol = 1 - val coreCol = 12 - val provisionedMemoryCol = 20 - val traceInterval = 5 * 60 * 1000L - - // Identify start time of the entire trace - var minTimestamp = Long.MAX_VALUE - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach file@{ vmFile -> - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val vmId = vmFile.name + val fragments = mutableListOf<Fragment>() + val trace = format.open(traceDirectory.toURI().toURL()) + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null || !clusters.contains(clusterName)) { - continue - } + var lastId: String? = null + var maxCores = Int.MIN_VALUE + var requiredMemory = Long.MIN_VALUE + var minTime = Long.MAX_VALUE + var maxTime = Long.MIN_VALUE + var lastTimestamp = Long.MIN_VALUE - val values = line.split("\t") - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + while (reader.nextRow()) { + val id = reader.get(RESOURCE_STATE_ID) - if (timestamp < minTimestamp) { - minTimestamp = timestamp - } - return@file - } - } - } + if (lastId != null && lastId != id) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", lastId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) } + lastId = id - println("Start of trace at $minTimestamp") + val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP) + val timestampMs = timestamp.toEpochMilli() + val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) + val cores = reader.getInt(RESOURCE_STATE_NCPUS) + val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY) - val allFragments = mutableListOf<Fragment>() + maxCores = max(maxCores, cores) + requiredMemory = max(requiredMemory, (memCapacity / 1000).toLong()) - val begin = 15 * 24 * 60 * 60 * 1000L - val end = 45 * 24 * 60 * 60 * 1000L - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores: Int - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split("\t") - - vmId = vmFile.name - - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null || !clusters.contains(clusterName)) { - continue - } - - val timestamp = - (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp - if (begin > timestamp || timestamp > end) { - continue - } - - cores = values[coreCol].trim().toInt() - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - cores - ) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - } - - if (last != null) { - yield(last!!) - } - } - - var maxTime = Long.MIN_VALUE - flopsFragments.filter { it.tick in begin until end }.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) - } - - if (minTime in begin until end) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) - } + if (lastTimestamp < 0) { + lastTimestamp = timestampMs - 5 * 60 * 1000L + minTime = min(minTime, lastTimestamp) } - return allFragments - } -} - -/** - * Conversion of the Bitbrains public trace. - */ -class BitbrainsConversion : TraceConversion("Bitbrains") { - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter<GenericData.Record> - ): MutableList<Fragment> { - val fragments = mutableListOf<Fragment>() - BitbrainsTraceReader(traceDirectory).use { reader -> - reader.forEach { entry -> - val trace = (entry.workload as SimTraceWorkload).trace - var maxTime = Long.MIN_VALUE - trace.forEach { fragment -> - val flops: Long = (fragment.usage * fragment.duration / 1000).toLong() + if (fragments.isEmpty()) { + val duration = 5 * 60 * 1000L + val flops: Long = (cpuUsage * duration / 1000).toLong() + fragments.add(Fragment(id, lastTimestamp, flops, duration, cpuUsage, cores)) + } else { + val last = fragments.last() + val duration = timestampMs - lastTimestamp + val flops: Long = (cpuUsage * duration / 1000).toLong() + + // Perform run-length encoding + if (last.id == id && (duration == 0L || last.usage == cpuUsage)) { + fragments[fragments.size - 1] = last.copy(duration = last.duration + duration) + } else { fragments.add( Fragment( - entry.name, - fragment.timestamp, + id, + lastTimestamp, flops, - fragment.duration, - fragment.usage, - fragment.cores + duration, + cpuUsage, + cores ) ) - maxTime = max(maxTime, fragment.timestamp + fragment.duration) } - - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", entry.name) - metaRecord.put("submissionTime", entry.start) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", entry.meta["cores"]) - metaRecord.put("requiredMemory", entry.meta["required-memory"]) - metaWriter.write(metaRecord) } + + val last = fragments.last() + maxTime = max(maxTime, last.tick + last.duration) + lastTimestamp = timestampMs } return fragments } } +class SolvinityConversion : AbstractConversion("Solvinity") { + override val format: TraceFormat = SvTraceFormat() +} + +/** + * Conversion of the Bitbrains public trace. + */ +class BitbrainsConversion : AbstractConversion("Bitbrains") { + override val format: TraceFormat = BitbrainsTraceFormat() +} + /** * Conversion of the Azure public VM trace. */ diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt index 3ce79d69..303a6a8c 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package org.opendc.format.trace +package org.opendc.experiments.capelin.trace import java.util.UUID diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt index 797a88d5..08304edc 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.format.trace +package org.opendc.experiments.capelin.trace /** * An interface for reading workloads into memory. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt index 6de3f265..cb32ce88 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt @@ -26,7 +26,6 @@ import mu.KotlinLogging import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.SamplingStrategy import org.opendc.experiments.capelin.model.Workload -import org.opendc.format.trace.TraceEntry import org.opendc.simulator.compute.workload.SimWorkload import java.util.* import kotlin.random.Random diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt new file mode 100644 index 00000000..35bfd5ef --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.bp + +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.util.parquet.LocalParquetReader +import java.nio.file.Path + +/** + * The resource state [Table] in the Bitbrains Parquet format. + */ +internal class BPResourceStateTable(private val path: Path) : Table { + override val name: String = TABLE_RESOURCE_STATES + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_DURATION -> true + RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_USAGE -> true + else -> false + } + } + + override fun newReader(): TableReader { + val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) + return BPResourceStateTableReader(reader) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Unknown partition $partition") + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt new file mode 100644 index 00000000..0e7ee555 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.bp + +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant + +/** + * A [TableReader] implementation for the Bitbrains Parquet format. + */ +internal class BPResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { + /** + * The current record. + */ + private var record: GenericRecord? = null + + override fun nextRow(): Boolean { + record = reader.read() + return record != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_DURATION -> true + RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_USAGE -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val record = checkNotNull(record) { "Reader in invalid state" } + + @Suppress("UNCHECKED_CAST") + val res: Any = when (column) { + RESOURCE_STATE_ID -> record["id"].toString() + RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long) + RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long) + RESOURCE_STATE_NCPUS -> record["cores"] + RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn<Int>): Int { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (column) { + RESOURCE_STATE_NCPUS -> record["cores"] as Int + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn<Long>): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn<Double>): Double { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (column) { + RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + reader.close() + } + + override fun toString(): String = "BPResourceStateTableReader" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt new file mode 100644 index 00000000..74d1e574 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.bp + +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.util.parquet.LocalParquetReader +import java.nio.file.Path + +/** + * The resource [Table] in the Bitbrains Parquet format. + */ +internal class BPResourceTable(private val path: Path) : Table { + override val name: String = TABLE_RESOURCES + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_ID -> true + RESOURCE_START_TIME -> true + RESOURCE_END_TIME -> true + RESOURCE_NCPUS -> true + RESOURCE_MEM_CAPACITY -> true + else -> false + } + } + + override fun newReader(): TableReader { + val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet")) + return BPResourceTableReader(reader) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Unknown partition $partition") + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt new file mode 100644 index 00000000..0a105783 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.bp + +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Instant + +/** + * A [TableReader] implementation for the Bitbrains Parquet format. + */ +internal class BPResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { + /** + * The current record. + */ + private var record: GenericRecord? = null + + override fun nextRow(): Boolean { + record = reader.read() + return record != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_ID -> true + RESOURCE_START_TIME -> true + RESOURCE_END_TIME -> true + RESOURCE_NCPUS -> true + RESOURCE_MEM_CAPACITY -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val record = checkNotNull(record) { "Reader in invalid state" } + + @Suppress("UNCHECKED_CAST") + val res: Any = when (column) { + RESOURCE_ID -> record["id"].toString() + RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long) + RESOURCE_END_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) + RESOURCE_NCPUS -> record["maxCores"] + RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn<Int>): Int { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (column) { + RESOURCE_NCPUS -> record["maxCores"] as Int + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn<Long>): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn<Double>): Double { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (column) { + RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + reader.close() + } + + override fun toString(): String = "BPResourceTableReader" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt new file mode 100644 index 00000000..486587b1 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.bp + +import org.opendc.trace.TABLE_RESOURCES +import org.opendc.trace.TABLE_RESOURCE_STATES +import org.opendc.trace.Table +import org.opendc.trace.Trace +import java.nio.file.Path + +/** + * A [Trace] in the Bitbrains Parquet format. + */ +public class BPTrace internal constructor(private val path: Path) : Trace { + override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun containsTable(name: String): Boolean = + name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES + + override fun getTable(name: String): Table? { + return when (name) { + TABLE_RESOURCES -> BPResourceTable(path) + TABLE_RESOURCE_STATES -> BPResourceStateTable(path) + else -> null + } + } + + override fun toString(): String = "BPTrace[$path]" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt new file mode 100644 index 00000000..49d5b4c5 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.bp + +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A format implementation for the GWF trace format. + */ +public class BPTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "bitbrains-parquet" + + /** + * Open a Bitbrains Parquet trace. + */ + override fun open(url: URL): BPTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return BPTrace(path) + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt new file mode 100644 index 00000000..71c9d52e --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.sv + +import org.opendc.trace.* +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.bufferedReader +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension + +/** + * The resource state [Table] in the Bitbrains format. + */ +internal class SvResourceStateTable(path: Path) : Table { + /** + * The partitions that belong to the table. + */ + private val partitions = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "txt" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + + override val name: String = TABLE_RESOURCE_STATES + + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_CLUSTER_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_CAPACITY -> true + RESOURCE_STATE_CPU_USAGE -> true + RESOURCE_STATE_CPU_USAGE_PCT -> true + RESOURCE_STATE_CPU_DEMAND -> true + RESOURCE_STATE_CPU_READY_PCT -> true + RESOURCE_STATE_MEM_CAPACITY -> true + RESOURCE_STATE_DISK_READ -> true + RESOURCE_STATE_DISK_WRITE -> true + else -> false + } + } + + override fun newReader(): TableReader { + val it = partitions.iterator() + + return object : TableReader { + var delegate: TableReader? = nextDelegate() + + override fun nextRow(): Boolean { + var delegate = delegate + + while (delegate != null) { + if (delegate.nextRow()) { + break + } + + delegate.close() + delegate = nextDelegate() + } + + this.delegate = delegate + return delegate != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false + + override fun <T> get(column: TableColumn<T>): T { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.get(column) + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getBoolean(column) + } + + override fun getInt(column: TableColumn<Int>): Int { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getInt(column) + } + + override fun getLong(column: TableColumn<Long>): Long { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getLong(column) + } + + override fun getDouble(column: TableColumn<Double>): Double { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getDouble(column) + } + + override fun close() { + delegate?.close() + } + + private fun nextDelegate(): TableReader? { + return if (it.hasNext()) { + val (partition, path) = it.next() + val reader = path.bufferedReader() + return SvResourceStateTableReader(partition, reader) + } else { + null + } + } + + override fun toString(): String = "BitbrainsCompositeTableReader" + } + } + + override fun newReader(partition: String): TableReader { + val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } + val reader = path.bufferedReader() + return SvResourceStateTableReader(partition, reader) + } + + override fun toString(): String = "SvResourceStateTable" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt new file mode 100644 index 00000000..adcdb2ea --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt @@ -0,0 +1,230 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.sv + +import org.opendc.trace.* +import java.io.BufferedReader +import java.time.Instant + +/** + * A [TableReader] for the Bitbrains resource state table. + */ +internal class SvResourceStateTableReader(partition: String, private val reader: BufferedReader) : TableReader { + /** + * The current parser state. + */ + private val state = RowState() + + override fun nextRow(): Boolean { + state.reset() + + var line: String + var num = 0 + + while (true) { + line = reader.readLine() ?: return false + num++ + + if (line[0] == '#' || line.isBlank()) { + // Ignore empty lines or comments + continue + } + + break + } + + line = line.trim() + + val length = line.length + var col = 0 + var start = 0 + var end = 0 + + while (end < length) { + // Trim all whitespace before the field + start = end + while (start < length && line[start].isWhitespace()) { + start++ + } + + end = line.indexOf(' ', start) + + if (end < 0) { + break + } + + val field = line.subSequence(start, end) as String + when (col++) { + COL_TIMESTAMP -> state.timestamp = Instant.ofEpochSecond(field.toLong(10)) + COL_CPU_USAGE -> state.cpuUsage = field.toDouble() + COL_CPU_DEMAND -> state.cpuDemand = field.toDouble() + COL_DISK_READ -> state.diskRead = field.toDouble() + COL_DISK_WRITE -> state.diskWrite = field.toDouble() + COL_CLUSTER_ID -> state.cluster = field.trim() + COL_NCPUS -> state.cpuCores = field.toInt(10) + COL_CPU_READY_PCT -> state.cpuReadyPct = field.toDouble() + COL_POWERED_ON -> state.poweredOn = field.toInt(10) == 1 + COL_CPU_CAPACITY -> state.cpuCapacity = field.toDouble() + COL_ID -> state.id = field.trim() + COL_MEM_CAPACITY -> state.memCapacity = field.toDouble() + } + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_CLUSTER_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_CAPACITY -> true + RESOURCE_STATE_CPU_USAGE -> true + RESOURCE_STATE_CPU_USAGE_PCT -> true + RESOURCE_STATE_CPU_DEMAND -> true + RESOURCE_STATE_CPU_READY_PCT -> true + RESOURCE_STATE_MEM_CAPACITY -> true + RESOURCE_STATE_DISK_READ -> true + RESOURCE_STATE_DISK_WRITE -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val res: Any? = when (column) { + RESOURCE_STATE_ID -> state.id + RESOURCE_STATE_CLUSTER_ID -> state.cluster + RESOURCE_STATE_TIMESTAMP -> state.timestamp + RESOURCE_STATE_NCPUS -> state.cpuCores + RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity + RESOURCE_STATE_CPU_USAGE -> state.cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity + RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity + RESOURCE_STATE_DISK_READ -> state.diskRead + RESOURCE_STATE_DISK_WRITE -> state.diskWrite + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + return when (column) { + RESOURCE_STATE_POWERED_ON -> state.poweredOn + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getInt(column: TableColumn<Int>): Int { + return when (column) { + RESOURCE_STATE_NCPUS -> state.cpuCores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn<Long>): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn<Double>): Double { + return when (column) { + RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity + RESOURCE_STATE_CPU_USAGE -> state.cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity + RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity + RESOURCE_STATE_DISK_READ -> state.diskRead + RESOURCE_STATE_DISK_WRITE -> state.diskWrite + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + reader.close() + } + + /** + * The current row state. + */ + private class RowState { + @JvmField + var id: String? = null + @JvmField + var cluster: String? = null + @JvmField + var timestamp: Instant? = null + @JvmField + var cpuCores = -1 + @JvmField + var cpuCapacity = Double.NaN + @JvmField + var cpuUsage = Double.NaN + @JvmField + var cpuDemand = Double.NaN + @JvmField + var cpuReadyPct = Double.NaN + @JvmField + var memCapacity = Double.NaN + @JvmField + var diskRead = Double.NaN + @JvmField + var diskWrite = Double.NaN + @JvmField + var poweredOn: Boolean = false + + /** + * Reset the state. + */ + fun reset() { + id = null + timestamp = null + cluster = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuDemand = Double.NaN + cpuReadyPct = Double.NaN + memCapacity = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + poweredOn = false + } + } + + /** + * Default column indices for the extended Bitbrains format. + */ + private val COL_TIMESTAMP = 0 + private val COL_CPU_USAGE = 1 + private val COL_CPU_DEMAND = 2 + private val COL_DISK_READ = 4 + private val COL_DISK_WRITE = 6 + private val COL_CLUSTER_ID = 10 + private val COL_NCPUS = 12 + private val COL_CPU_READY_PCT = 13 + private val COL_POWERED_ON = 14 + private val COL_CPU_CAPACITY = 18 + private val COL_ID = 19 + private val COL_MEM_CAPACITY = 20 +} diff --git a/opendc-format/src/test/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt index 48b4a2e3..dbd63de5 100644 --- a/opendc-format/src/test/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReaderTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt @@ -20,26 +20,26 @@ * SOFTWARE. */ -package org.opendc.format.trace.bitbrains +package org.opendc.experiments.capelin.trace.sv -import org.junit.jupiter.api.Assertions.assertAll -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test +import org.opendc.trace.* +import java.nio.file.Path /** - * Test suite for the [BitbrainsTraceReader] class. + * [Trace] implementation for the extended Bitbrains format. */ -class BitbrainsTraceReaderTest { - @Test - fun testSmoke() { - val file = BitbrainsTraceReaderTest::class.java.getResourceAsStream("/bitbrains.csv")!! - BitbrainsRawTraceReader(file).use { reader -> - val entry = reader.next() +public class SvTrace internal constructor(private val path: Path) : Trace { + override val tables: List<String> = listOf(TABLE_RESOURCE_STATES) - assertAll( - { assertEquals(1376314846, entry.timestamp) }, - { assertEquals(19.066, entry.cpuUsage, 0.01) } - ) + override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name + + override fun getTable(name: String): Table? { + if (!containsTable(name)) { + return null } + + return SvResourceStateTable(path) } + + override fun toString(): String = "SvTrace[$path]" } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt new file mode 100644 index 00000000..0cce8559 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.sv + +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A format implementation for the extended Bitbrains trace format. + */ +public class SvTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "sv" + + /** + * Open the trace file. + */ + override fun open(url: URL): SvTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return SvTrace(path) + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index e4d3fed3..2934bbe6 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -36,13 +36,13 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.experiments.capelin.env.ClusterEnvironmentReader +import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.trace.TraceReader +import org.opendc.experiments.capelin.trace.TraceReader import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts index bc05f09b..7d34d098 100644 --- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts @@ -31,7 +31,6 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) - implementation(projects.opendcFormat) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcSimulator.opendcSimulatorFailures) diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts index b088045b..882c4894 100644 --- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts @@ -34,8 +34,11 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcFormat) implementation(projects.opendcUtils) implementation(libs.kotlin.logging) + implementation(libs.jackson.module.kotlin) { + exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") + } + implementation("org.jetbrains.kotlin:kotlin-reflect:1.5.30") } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt index 9a48aced..2153a862 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt @@ -55,7 +55,8 @@ public class TensorFlowExperiment : Experiment(name = "tf20") { .build() val meter = meterProvider.get("opendc-tf20") - val def = MLEnvironmentReader(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile)).read().first() + val envInput = checkNotNull(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile)) + val def = MLEnvironmentReader().readEnvironment(envInput).first() val device = SimTFDevice( def.uid, def.meta["gpu"] as Boolean, coroutineContext, clock, meter, def.model.cpus[0], def.model.memory[0], LinearPowerModel(250.0, 60.0) diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt index 3e61f508..3cdf28fd 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt @@ -25,8 +25,6 @@ package org.opendc.experiments.tf20.util import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -36,13 +34,16 @@ import java.io.InputStream import java.util.* /** - * An [EnvironmentReader] for the TensorFlow experiments. + * An environment reader for the TensorFlow experiments. */ -public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { +public class MLEnvironmentReader { + /** + * The [ObjectMapper] to convert the format. + */ + private val mapper = jacksonObjectMapper() - private val setup: Setup = mapper.readValue(input) - - override fun read(): List<MachineDef> { + public fun readEnvironment(input: InputStream): List<MachineDef> { + val setup: Setup = mapper.readValue(input) var counter = 0 return setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> @@ -109,6 +110,4 @@ public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jack } } } - - override fun close() {} } diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt index f65c4880..271f5923 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt @@ -20,12 +20,15 @@ * SOFTWARE. */ -package org.opendc.format.environment +package org.opendc.experiments.tf20.util import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.power.PowerModel import java.util.* +/** + * A definition of a machine in a cluster. + */ public data class MachineDef( val uid: UUID, val name: String, diff --git a/opendc-format/build.gradle.kts b/opendc-format/build.gradle.kts index c8e30846..0c7f2a51 100644 --- a/opendc-format/build.gradle.kts +++ b/opendc-format/build.gradle.kts @@ -39,31 +39,9 @@ dependencies { exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") } implementation(libs.jackson.dataformat.csv) - implementation(kotlin("reflect")) + implementation("org.jetbrains.kotlin:kotlin-reflect:1.5.30") - /* This configuration is necessary for a slim dependency on Apache Parquet */ - implementation(libs.parquet) { - exclude(group = "org.apache.hadoop") - } - runtimeOnly(libs.hadoop.common) { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - exclude(group = "log4j") - exclude(group = "org.apache.hadoop") - exclude(group = "org.apache.curator") - exclude(group = "org.apache.zookeeper") - exclude(group = "org.apache.kerby") - exclude(group = "org.apache.httpcomponents") - exclude(group = "org.apache.htrace") - exclude(group = "commons-cli") - exclude(group = "javax.servlet") - exclude(group = "org.eclipse.jetty") - exclude(group = "com.sun.jersey") - exclude(group = "com.jcraft") - exclude(group = "dnsjava") - } - runtimeOnly(libs.hadoop.mapreduce.client.core) { - isTransitive = false - } + implementation(projects.opendcTrace.opendcTraceParquet) testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt deleted file mode 100644 index c313467f..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc18 - -import com.fasterxml.jackson.annotation.JsonSubTypes -import com.fasterxml.jackson.annotation.JsonTypeInfo - -/** - * A topology setup. - * - * @property name The name of the setup. - * @property rooms The rooms in the topology. - */ -internal data class Setup(val name: String, val rooms: List<Room>) - -/** - * A room in a topology. - * - * @property type The type of room in the topology. - * @property objects The objects in the room. - */ -internal data class Room(val type: String, val objects: List<RoomObject>) - -/** - * An object in a [Room]. - * - * @property type The type of the room object. - */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") -@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) -internal sealed class RoomObject(val type: String) { - /** - * A rack in a server room. - * - * @property machines The machines in the rack. - */ - internal data class Rack(val machines: List<Machine>) : RoomObject("RACK") -} - -/** - * A machine in the setup that consists of the specified CPU's represented as - * integer identifiers and ethernet speed. - * - * @property cpus The CPUs in the machine represented as integer identifiers. - */ -internal data class Machine(val cpus: List<Int>) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt deleted file mode 100644 index 7780a98e..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc18 - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.ConstantPowerModel -import java.io.InputStream -import java.util.* - -/** - * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Topology - * Schedulers". - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. - */ -public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { - /** - * The environment that was read from the file. - */ - private val setup: Setup = mapper.readValue(input) - - /** - * Read the environment. - */ - public override fun read(): List<MachineDef> { - var counter = 0 - return setup.rooms.flatMap { room -> - room.objects.flatMap { roomObject -> - when (roomObject) { - is RoomObject.Rack -> { - roomObject.machines.map { machine -> - val cores = machine.cpus.flatMap { id -> - when (id) { - 1 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) - List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } - } - 2 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) - List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } - } - else -> throw IllegalArgumentException("The cpu id $id is not recognized") - } - } - MachineDef( - UUID(0L, counter++.toLong()), - "node-$counter", - emptyMap(), - MachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))), - ConstantPowerModel(0.0) - ) - } - } - } - } - } - } - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt deleted file mode 100644 index ff6cdd02..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.bitbrains - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.databind.MappingIterator -import com.fasterxml.jackson.dataformat.csv.CsvMapper -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import java.io.InputStream - -/** - * A trace reader that enables the user to read Bitbrains specific trace data. - */ -public class BitbrainsRawTraceReader(input: InputStream) : Iterator<BitbrainsRawTraceReader.Entry>, AutoCloseable { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = CsvSchema.builder() - .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .setUseHeader(true) - .setColumnSeparator(';') - .build() - - /** - * The mapping iterator to use. - */ - private val iterator: MappingIterator<Entry> = CsvMapper().readerFor(Entry::class.java).with(schema) - .readValues(input) - - override fun hasNext(): Boolean { - return iterator.hasNext() - } - - override fun next(): Entry { - return iterator.next() - } - - override fun close() { - iterator.close() - } - - /** - * A single entry in the trace. - */ - public data class Entry( - @JsonProperty("Timestamp [ms]") - val timestamp: Long, - @JsonProperty("CPU cores") - val cpuCores: Int, - @JsonProperty("CPU capacity provisioned [MHZ]") - val cpuCapacity: Double, - @JsonProperty("CPU usage [MHZ]") - val cpuUsage: Double, - @JsonProperty("CPU usage [%]") - val cpuUsagePct: Double, - @JsonProperty("Memory capacity provisioned [KB]") - val memCapacity: Double, - @JsonProperty("Memory usage [KB]") - val memUsage: Double, - @JsonProperty("Disk read throughput [KB/s]") - val diskRead: Double, - @JsonProperty("Disk write throughput [KB/s]") - val diskWrite: Double, - @JsonProperty("Network received throughput [KB/s]") - val netReceived: Double, - @JsonProperty("Network transmitted throughput [KB/s]") - val netTransmitted: Double - ) -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt deleted file mode 100644 index 9e4876df..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.bitbrains - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.File -import java.io.FileInputStream -import java.util.* -import kotlin.math.max -import kotlin.math.min - -/** - * A [TraceReader] for the public VM workload trace format. - * - * @param traceDirectory The directory of the traces. - */ -public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkload> { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>() - val traceInterval = 5 * 60 * 1000L - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" } - .forEach { vmFile -> - val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>() - var vmId = -1L - var maxCores = Int.MIN_VALUE - var requiredMemory = Long.MIN_VALUE - var startTime = Long.MAX_VALUE - var lastTimestamp = Long.MIN_VALUE - - BitbrainsRawTraceReader(FileInputStream(vmFile)).use { reader -> - reader.forEach { entry -> - val timestamp = entry.timestamp * 1000L - val cpuUsage = entry.cpuUsage - vmId = vmFile.nameWithoutExtension.trim().toLong() - val cores = entry.cpuCores - maxCores = max(maxCores, cores) - requiredMemory = max(requiredMemory, (entry.memCapacity / 1000).toLong()) - - if (lastTimestamp < 0) { - lastTimestamp = timestamp - 5 * 60 * 1000L - startTime = min(startTime, lastTimestamp) - } - - if (flopsHistory.isEmpty()) { - flopsHistory.add(SimTraceWorkload.Fragment(lastTimestamp, traceInterval, cpuUsage, cores)) - } else { - val last = flopsHistory.last() - val duration = timestamp - lastTimestamp - // Perform run-length encoding - if (duration == 0L || last.usage == cpuUsage) { - flopsHistory[flopsHistory.size - 1] = last.copy(duration = last.duration + duration) - } else { - flopsHistory.add( - SimTraceWorkload.Fragment( - lastTimestamp, - duration, - cpuUsage, - cores - ) - ) - } - } - - lastTimestamp = timestamp - } - } - - val uuid = UUID(0L, vmId) - - val workload = SimTraceWorkload(flopsHistory.asSequence()) - entries[vmId] = TraceEntry( - uuid, - vmId.toString(), - startTime, - workload, - mapOf( - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.start }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt deleted file mode 100644 index e68afeb7..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.gwf - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task -import org.opendc.workflow.api.WORKFLOW_TASK_CORES -import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE -import java.io.BufferedReader -import java.io.File -import java.io.InputStream -import java.util.* -import kotlin.collections.HashSet -import kotlin.collections.Iterator -import kotlin.collections.List -import kotlin.collections.MutableSet -import kotlin.collections.component1 -import kotlin.collections.component2 -import kotlin.collections.filter -import kotlin.collections.forEach -import kotlin.collections.getOrPut -import kotlin.collections.map -import kotlin.collections.mapIndexed -import kotlin.collections.mapOf -import kotlin.collections.mutableMapOf -import kotlin.collections.set -import kotlin.collections.sortedBy -import kotlin.collections.toMap -import kotlin.math.max -import kotlin.math.min - -/** - * A [TraceReader] for the Grid Workload Format. See the Grid Workloads Archive (http://gwa.ewi.tudelft.nl/) for more - * information about the format. - * - * Be aware that in the Grid Workload Format, workflows are not required to be ordered by submission time and therefore - * this reader needs to read the whole trace into memory before an entry can be read. Consider converting the trace to a - * different format for better performance. - * - * @param reader The buffered reader to read the trace with. - */ -public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<Job>> - - /** - * Create a [GwfTraceReader] instance from the specified [File]. - * - * @param file The file to read from. - */ - public constructor(file: File) : this(file.bufferedReader()) - - /** - * Create a [GwfTraceReader] instance from the specified [InputStream]. - * - * @param input The input stream to read from. - */ - public constructor(input: InputStream) : this(input.bufferedReader()) - - /** - * Initialize the reader. - */ - init { - val workflows = mutableMapOf<Long, Job>() - val starts = mutableMapOf<Long, Long>() - val tasks = mutableMapOf<Long, Task>() - val taskDependencies = mutableMapOf<Task, List<Long>>() - - var workflowIdCol = 0 - var taskIdCol = 0 - var submitTimeCol = 0 - var runtimeCol = 0 - var coreCol = 0 - var dependencyCol = 0 - - try { - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith("#") && line.isNotBlank() - } - .forEachIndexed { idx, line -> - val values = line.split(",") - - // Parse GWF header - if (idx == 0) { - val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() - workflowIdCol = header["WorkflowID"]!! - taskIdCol = header["JobID"]!! - submitTimeCol = header["SubmitTime"]!! - runtimeCol = header["RunTime"]!! - coreCol = header["NProcs"]!! - dependencyCol = header["Dependencies"]!! - return@forEachIndexed - } - - val workflowId = values[workflowIdCol].trim().toLong() - val taskId = values[taskIdCol].trim().toLong() - val submitTime = values[submitTimeCol].trim().toLong() * 1000 // ms - val runtime = max(0, values[runtimeCol].trim().toLong()) // s - val cores = values[coreCol].trim().toInt() - val dependencies = values[dependencyCol].split(" ") - .filter { it.isNotEmpty() } - .map { it.trim().toLong() } - - val flops: Long = 4000 * runtime * cores - - val workflow = workflows.getOrPut(workflowId) { - Job(UUID(0L, workflowId), "<unnamed>", HashSet()) - } - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, taskId), - "<unnamed>", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to cores, - WORKFLOW_TASK_DEADLINE to (runtime * 1000) - ), - ) - starts.merge(workflowId, submitTime, ::min) - (workflow.tasks as MutableSet<Task>).add(task) - tasks[taskId] = task - taskDependencies[task] = dependencies - } - } finally { - reader.close() - } - - // Fix dependencies and dependents for all tasks - taskDependencies.forEach { (task, dependencies) -> - (task.dependencies as MutableSet<Task>).addAll( - dependencies.map { taskId -> - tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") - } - ) - } - - // Create the entry iterator - iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } - .sortedBy { it.start } - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<Job> = iterator.next() - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt deleted file mode 100644 index bda392a9..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.swf - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.* - -/** - * A [TraceReader] for reading SWF traces into VM-modeled workloads. - * - * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html - * - * @param file The trace file. - */ -public class SwfTraceReader( - file: File, - maxNumCores: Int = -1 -) : TraceReader<SimWorkload> { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>() - - val jobNumberCol = 0 - val submitTimeCol = 1 // seconds (begin of trace is 0) - val waitTimeCol = 2 // seconds - val runTimeCol = 3 // seconds - val numAllocatedCoresCol = 4 // We assume that single-core processors were used at the time - val requestedMemoryCol = 9 // KB per processor/core (-1 if not specified) - - val sliceDuration = 5 * 60L - - var jobNumber: Long - var submitTime: Long - var waitTime: Long - var runTime: Long - var cores: Int - var memory: Long - var slicedWaitTime: Long - var runtimePartialSliceRemainder: Long - - BufferedReader(FileReader(file)).use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith(";") && line.isNotBlank() - } - .forEach { line -> - val values = line.trim().split("\\s+".toRegex()) - - jobNumber = values[jobNumberCol].trim().toLong() - submitTime = values[submitTimeCol].trim().toLong() - waitTime = values[waitTimeCol].trim().toLong() - runTime = values[runTimeCol].trim().toLong() - cores = values[numAllocatedCoresCol].trim().toInt() - memory = values[requestedMemoryCol].trim().toLong() - - if (maxNumCores != -1 && cores > maxNumCores) { - println("Skipped a task due to processor count ($cores > $maxNumCores).") - return@forEach - } - - if (memory == -1L) { - memory = 1000L * cores // assume 1GB of memory per processor if not specified - } else { - memory /= 1000 // convert KB to MB - } - - val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>() - - // Insert waiting time slices - - // We ignore wait time remainders under one - slicedWaitTime = 0L - if (waitTime >= sliceDuration) { - for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) { - flopsHistory.add( - SimTraceWorkload.Fragment( - tick, - sliceDuration * 1000, - 0.0, - cores - ) - ) - slicedWaitTime += sliceDuration - } - } - - // Insert run time slices - - runtimePartialSliceRemainder = runTime % sliceDuration - - for ( - tick in (submitTime + slicedWaitTime) - until (submitTime + slicedWaitTime + runTime - sliceDuration) - step sliceDuration - ) { - flopsHistory.add( - SimTraceWorkload.Fragment( - tick, - sliceDuration * 1000L, - 1.0, - cores - ) - ) - } - - if (runtimePartialSliceRemainder > 0) { - flopsHistory.add( - SimTraceWorkload.Fragment( - submitTime + slicedWaitTime + runTime, - sliceDuration, - runtimePartialSliceRemainder / sliceDuration.toDouble(), - cores - ) - ) - } - - val uuid = UUID(0L, jobNumber) - val workload = SimTraceWorkload(flopsHistory.asSequence()) - entries[jobNumber] = TraceEntry( - uuid, - jobNumber.toString(), - submitTime, - workload, - mapOf( - "cores" to cores, - "required-memory" to memory, - "workload" to workload - ) - ) - } - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.start }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt deleted file mode 100644 index dde1b340..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.wtf - -import org.apache.avro.generic.GenericRecord -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.format.util.LocalParquetReader -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task -import org.opendc.workflow.api.WORKFLOW_TASK_CORES -import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE -import java.io.File -import java.nio.file.Path -import java.util.UUID -import kotlin.math.min - -/** - * A [TraceReader] for the Workflow Trace Format (WTF). See the Workflow Trace Archive - * (https://wta.atlarge-research.com/) for more information about the format. - * - * @param path The path to the trace. - */ -public class WtfTraceReader(path: Path) : TraceReader<Job> { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<Job>> - - /** - * Construct a [TraceReader] from the specified [path]. - * - * @param path The path to the trace. - */ - public constructor(path: File) : this(path.toPath()) - - /** - * Initialize the reader. - */ - init { - val workflows = mutableMapOf<Long, Job>() - val starts = mutableMapOf<Long, Long>() - val tasks = mutableMapOf<Long, Task>() - val taskDependencies = mutableMapOf<Task, List<Long>>() - - LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")).use { reader -> - while (true) { - val nextRecord = reader.read() ?: break - - val workflowId = nextRecord.get("workflow_id") as Long - val taskId = nextRecord.get("id") as Long - val submitTime = nextRecord.get("ts_submit") as Long - val runtime = nextRecord.get("runtime") as Long - val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - - @Suppress("UNCHECKED_CAST") - val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map { - it.get("item") as Long - } - - val flops: Long = 4100 * (runtime / 1000) * cores - - val workflow = workflows.getOrPut(workflowId) { - Job(UUID(0L, workflowId), "<unnamed>", HashSet()) - } - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, taskId), - "<unnamed>", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to cores, - WORKFLOW_TASK_DEADLINE to runtime - ) - ) - - starts.merge(workflowId, submitTime, ::min) - (workflow.tasks as MutableSet<Task>).add(task) - tasks[taskId] = task - taskDependencies[task] = dependencies - } - } - - // Fix dependencies and dependents for all tasks - taskDependencies.forEach { (task, dependencies) -> - (task.dependencies as MutableSet<Task>).addAll( - dependencies.map { taskId -> - tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") - } - ) - } - - // Create the entry iterator - iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } - .sortedBy { it.start } - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<Job> = iterator.next() - - override fun close() {} -} diff --git a/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt b/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt deleted file mode 100644 index e0e049cf..00000000 --- a/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.swf - -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.opendc.simulator.compute.workload.SimTraceWorkload -import java.io.File - -class SwfTraceReaderTest { - @Test - internal fun testParseSwf() { - val reader = SwfTraceReader(File(SwfTraceReaderTest::class.java.getResource("/swf_trace.txt").toURI())) - var entry = reader.next() - assertEquals(0, entry.start) - // 1961 slices for waiting, 3 full and 1 partial running slices - assertEquals(1965, (entry.workload as SimTraceWorkload).trace.toList().size) - - entry = reader.next() - assertEquals(164472, entry.start) - // 1188 slices for waiting, 0 full and 1 partial running slices - assertEquals(1189, (entry.workload as SimTraceWorkload).trace.toList().size) - assertEquals(0.25, (entry.workload as SimTraceWorkload).trace.toList().last().usage) - } -} diff --git a/opendc-trace/build.gradle.kts b/opendc-trace/build.gradle.kts new file mode 100644 index 00000000..7edfd134 --- /dev/null +++ b/opendc-trace/build.gradle.kts @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ diff --git a/opendc-trace/opendc-trace-api/build.gradle.kts b/opendc-trace/opendc-trace-api/build.gradle.kts new file mode 100644 index 00000000..b2f91593 --- /dev/null +++ b/opendc-trace/opendc-trace-api/build.gradle.kts @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Workload trace library for OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt new file mode 100644 index 00000000..44dec95b --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ResourceColumns") +package org.opendc.trace + +import java.time.Instant + +/** + * Identifier of the resource. + */ +@JvmField +public val RESOURCE_ID: TableColumn<String> = stringColumn("resource:id") + +/** + * Start time for the resource. + */ +@JvmField +public val RESOURCE_START_TIME: TableColumn<Instant> = TableColumn("resource:start_time", Instant::class.java) + +/** + * End time for the resource. + */ +@JvmField +public val RESOURCE_END_TIME: TableColumn<Instant> = TableColumn("resource:end_time", Instant::class.java) + +/** + * Number of CPUs for the resource. + */ +@JvmField +public val RESOURCE_NCPUS: TableColumn<Int> = intColumn("resource:num_cpus") + +/** + * Memory capacity for the resource. + */ +@JvmField +public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource:mem_capacity") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt new file mode 100644 index 00000000..1933967e --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ResourceStateColumns") +package org.opendc.trace + +import java.time.Duration +import java.time.Instant + +/** + * Identifier of the resource. + */ +@JvmField +public val RESOURCE_STATE_ID: TableColumn<String> = stringColumn("resource_state:id") + +/** + * The cluster to which the resource belongs. + */ +@JvmField +public val RESOURCE_STATE_CLUSTER_ID: TableColumn<String> = stringColumn("resource_state:cluster_id") + +/** + * Timestamp for the state. + */ +@JvmField +public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = TableColumn("resource_state:timestamp", Instant::class.java) + +/** + * Duration for the state. + */ +@JvmField +public val RESOURCE_STATE_DURATION: TableColumn<Duration> = TableColumn("resource_state:duration", Duration::class.java) + +/** + * A flag to indicate that the resource is powered on. + */ +@JvmField +public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = booleanColumn("resource_state:powered_on") + +/** + * Number of CPUs for the resource. + */ +@JvmField +public val RESOURCE_STATE_NCPUS: TableColumn<Int> = intColumn("resource_state:ncpus") + +/** + * Total CPU capacity of the resource in MHz. + */ +@JvmField +public val RESOURCE_STATE_CPU_CAPACITY: TableColumn<Double> = doubleColumn("resource_state:cpu_capacity") + +/** + * Total CPU usage of the resource in MHz. + */ +@JvmField +public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = doubleColumn("resource_state:cpu_usage") + +/** + * Total CPU usage of the resource in percentage. + */ +@JvmField +public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = doubleColumn("resource_state:cpu_usage_pct") + +/** + * Total CPU demand of the resource in MHz. + */ +@JvmField +public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = doubleColumn("resource_state:cpu_demand") + +/** + * CPU ready percentage. + */ +@JvmField +public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = doubleColumn("resource_state:cpu_ready_pct") + +/** + * Memory capacity of the resource in KB. + */ +@JvmField +public val RESOURCE_STATE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource_state:mem_capacity") + +/** + * Memory usage of the resource in KB. + */ +@JvmField +public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = doubleColumn("resource_state:mem_usage") + +/** + * Disk read throughput of the resource in KB/s. + */ +@JvmField +public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = doubleColumn("resource_state:disk_read") + +/** + * Disk write throughput of the resource in KB/s. + */ +@JvmField +public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = doubleColumn("resource_state:disk_write") + +/** + * Network receive throughput of the resource in KB/s. + */ +@JvmField +public val RESOURCE_STATE_NET_RX: TableColumn<Double> = doubleColumn("resource_state:net_rx") + +/** + * Network transmit throughput of the resource in KB/s. + */ +@JvmField +public val RESOURCE_STATE_NET_TX: TableColumn<Double> = doubleColumn("resource_state:net_tx") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt new file mode 100644 index 00000000..11e5d6b7 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace + +/** + * A table is collection of rows consisting of typed columns. + */ +public interface Table { + /** + * The name of the table. + */ + public val name: String + + /** + * A flag to indicate that the table is synthetic (derived from another table). + */ + public val isSynthetic: Boolean + + /** + * Determine whether the specified [column] is supported by this table. + */ + public fun isSupported(column: TableColumn<*>): Boolean + + /** + * Open a [TableReader] for this table. + */ + public fun newReader(): TableReader + + /** + * Open a [TableReader] for [partition] of the table. + */ + public fun newReader(partition: String): TableReader +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt new file mode 100644 index 00000000..776c40c0 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace + +import java.util.* + +/** + * A column in a trace table. + * + * @param name The universal name of this column. + */ +public class TableColumn<out T>(public val name: String, type: Class<T>) { + /** + * The type of the column. + */ + private val type: Class<*> = type + + /** + * Determine whether the type of the column is a subtype of [column]. + */ + public fun isAssignableTo(column: TableColumn<*>): Boolean { + return name == column.name && type.isAssignableFrom(column.type) + } + + /** + * Compute a hash code for this column. + */ + public override fun hashCode(): Int = Objects.hash(name, type) + + /** + * Determine whether this column is equal to [other]. + */ + public override fun equals(other: Any?): Boolean { + // Fast-path: reference equality + if (this === other) { + return true + } else if (other == null || other !is TableColumn<*>) { + return false + } + + return name == other.name && type == other.type + } + + /** + * Return a string representation of this column. + */ + public override fun toString(): String = "TableColumn[$name,$type]" +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt new file mode 100644 index 00000000..64920498 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TableColumns") +package org.opendc.trace + +/** + * Construct a [TableColumn] with [Any] type. + */ +public fun objectColumn(name: String): TableColumn<Any> = TableColumn(name, Any::class.java) + +/** + * Construct a [TableColumn] with a [String] type. + */ +public fun stringColumn(name: String): TableColumn<String> = TableColumn(name, String::class.java) + +/** + * Construct a [TableColumn] with a [Number] type. + */ +public fun numberColumn(name: String): TableColumn<Number> = TableColumn(name, Number::class.java) + +/** + * Construct a [TableColumn] with an [Int] type. + */ +public fun intColumn(name: String): TableColumn<Int> = TableColumn(name, Int::class.java) + +/** + * Construct a [TableColumn] with a [Long] type. + */ +public fun longColumn(name: String): TableColumn<Long> = TableColumn(name, Long::class.java) + +/** + * Construct a [TableColumn] with a [Double] type. + */ +public fun doubleColumn(name: String): TableColumn<Double> = TableColumn(name, Double::class.java) + +/** + * Construct a [TableColumn] with a [Boolean] type. + */ +public fun booleanColumn(name: String): TableColumn<Boolean> = TableColumn(name, Boolean::class.java) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt new file mode 100644 index 00000000..b5e7669f --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace + +/** + * Base class for reading entities from a workload trace table in streaming fashion. + */ +public interface TableReader : AutoCloseable { + /** + * Advance the stream until the next row is reached. + * + * @return `true` if the row is valid, `false` if there are no more rows. + */ + public fun nextRow(): Boolean + + /** + * Determine whether the [TableReader] supports the specified [column]. + */ + public fun hasColumn(column: TableColumn<*>): Boolean + + /** + * Obtain the value of the current column with type [T]. + */ + public fun <T> get(column: TableColumn<T>): T + + /** + * Read the specified [column] as boolean. + */ + public fun getBoolean(column: TableColumn<Boolean>): Boolean + + /** + * Read the specified [column] as integer. + */ + public fun getInt(column: TableColumn<Int>): Int + + /** + * Read the specified [column] as long. + */ + public fun getLong(column: TableColumn<Long>): Long + + /** + * Read the specified [column] as double. + */ + public fun getDouble(column: TableColumn<Double>): Double + + /** + * Closes the reader so that no further iteration or data access can be made. + */ + public override fun close() +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt new file mode 100644 index 00000000..bb9d93e2 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("Tables") +package org.opendc.trace + +/** + * A table containing all workflows in a workload. + */ +public const val TABLE_WORKFLOWS: String = "workflows" + +/** + * A table containing all tasks in a workload. + */ +public const val TABLE_TASKS: String = "tasks" + +/** + * A table containing all resources in a workload. + */ +public const val TABLE_RESOURCES: String = "resources" + +/** + * A table containing all resource states in a workload. + */ +public const val TABLE_RESOURCE_STATES: String = "resource_states" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt new file mode 100644 index 00000000..88bbc623 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TaskColumns") +package org.opendc.trace + +/** + * A column containing the task identifier. + */ +@JvmField +public val TASK_ID: TableColumn<Long> = longColumn("task:id") + +/** + * A column containing the identifier of the workflow. + */ +@JvmField +public val TASK_WORKFLOW_ID: TableColumn<Long> = longColumn("task:workflow_id") + +/** + * A column containing the submit time of the task. + */ +@JvmField +public val TASK_SUBMIT_TIME: TableColumn<Long> = longColumn("task:submit_time") + +/** + * A column containing the wait time of the task. + */ +@JvmField +public val TASK_WAIT_TIME: TableColumn<Long> = longColumn("task:wait_time") + +/** + * A column containing the runtime time of the task. + */ +@JvmField +public val TASK_RUNTIME: TableColumn<Long> = longColumn("task:runtime") + +/** + * A column containing the parents of a task. + */ +@Suppress("UNCHECKED_CAST") +@JvmField +public val TASK_PARENTS: TableColumn<Set<Long>> = TableColumn("task:parents", type = Set::class.java as Class<Set<Long>>) + +/** + * A column containing the children of a task. + */ +@Suppress("UNCHECKED_CAST") +@JvmField +public val TASK_CHILDREN: TableColumn<Set<Long>> = TableColumn("task:children", type = Set::class.java as Class<Set<Long>>) + +/** + * A column containing the requested CPUs of a task. + */ +@JvmField +public val TASK_REQ_NCPUS: TableColumn<Int> = intColumn("task:req_ncpus") + +/** + * A column containing the allocated CPUs of a task. + */ +@JvmField +public val TASK_ALLOC_NCPUS: TableColumn<Int> = intColumn("task:alloc_ncpus") + +/** + * A column containing the status of a task. + */ +@JvmField +public val TASK_STATUS: TableColumn<Int> = intColumn("task:status") + +/** + * A column containing the group id of a task. + */ +@JvmField +public val TASK_GROUP_ID: TableColumn<Int> = intColumn("task:group_id") + +/** + * A column containing the user id of a task. + */ +@JvmField +public val TASK_USER_ID: TableColumn<Int> = intColumn("task:user_id") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt new file mode 100644 index 00000000..36e93b52 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace + +/** + * A trace is a collection of related tables that characterize a workload. + */ +public interface Trace { + /** + * The list of table names in the workload trace. + */ + public val tables: List<String> + + /** + * Determine if the trace contains a table with the specified [name]. + */ + public fun containsTable(name: String): Boolean + + /** + * Obtain a [Table] with the specified [name]. + */ + public fun getTable(name: String): Table? +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt new file mode 100644 index 00000000..54029fcf --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.spi + +import org.opendc.trace.Trace +import java.net.URL +import java.util.* + +/** + * A service-provider class for parsing trace formats. + */ +public interface TraceFormat { + /** + * The name of the trace format. + */ + public val name: String + + /** + * Open a new [Trace] with this provider. + * + * @param url A reference to the trace. + */ + public fun open(url: URL): Trace + + /** + * A helper object for resolving providers. + */ + public companion object { + /** + * A list of [TraceFormat] that are available on this system. + */ + public val installedProviders: List<TraceFormat> by lazy { + val loader = ServiceLoader.load(TraceFormat::class.java) + loader.toList() + } + + /** + * Obtain a [TraceFormat] implementation by [name]. + */ + public fun byName(name: String): TraceFormat? = installedProviders.find { it.name == name } + } +} diff --git a/opendc-trace/opendc-trace-bitbrains/build.gradle.kts b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts new file mode 100644 index 00000000..d195cbbb --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support for GWF traces in OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcTrace.opendcTraceApi) + implementation(libs.jackson.dataformat.csv) +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt new file mode 100644 index 00000000..767ef919 --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension + +/** + * The resource state [Table] in the Bitbrains format. + */ +internal class BitbrainsResourceStateTable(private val factory: CsvFactory, private val path: Path) : Table { + /** + * The partitions that belong to the table. + */ + private val partitions = + Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + + override val name: String = TABLE_RESOURCE_STATES + + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_CAPACITY -> true + RESOURCE_STATE_CPU_USAGE -> true + RESOURCE_STATE_CPU_USAGE_PCT -> true + RESOURCE_STATE_MEM_CAPACITY -> true + RESOURCE_STATE_MEM_USAGE -> true + RESOURCE_STATE_DISK_READ -> true + RESOURCE_STATE_DISK_WRITE -> true + RESOURCE_STATE_NET_RX -> true + RESOURCE_STATE_NET_TX -> true + else -> false + } + } + + override fun newReader(): TableReader { + val it = partitions.iterator() + + return object : TableReader { + var delegate: TableReader? = nextDelegate() + + override fun nextRow(): Boolean { + var delegate = delegate + + while (delegate != null) { + if (delegate.nextRow()) { + break + } + + delegate.close() + delegate = nextDelegate() + } + + this.delegate = delegate + return delegate != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false + + override fun <T> get(column: TableColumn<T>): T { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.get(column) + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getBoolean(column) + } + + override fun getInt(column: TableColumn<Int>): Int { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getInt(column) + } + + override fun getLong(column: TableColumn<Long>): Long { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getLong(column) + } + + override fun getDouble(column: TableColumn<Double>): Double { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getDouble(column) + } + + override fun close() { + delegate?.close() + } + + private fun nextDelegate(): TableReader? { + return if (it.hasNext()) { + val (partition, path) = it.next() + return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile())) + } else { + null + } + } + + override fun toString(): String = "BitbrainsCompositeTableReader" + } + } + + override fun newReader(partition: String): TableReader { + val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } + return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile())) + } + + override fun toString(): String = "BitbrainsResourceStateTable" +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt new file mode 100644 index 00000000..5687ac7f --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.opendc.trace.* +import java.time.Instant + +/** + * A [TableReader] for the Bitbrains resource state table. + */ +internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader { + /** + * The current parser state. + */ + private val state = RowState() + + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + // Reset the row state + state.reset() + + if (!nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "Timestamp [ms]" -> state.timestamp = Instant.ofEpochSecond(parser.longValue) + "CPU cores" -> state.cpuCores = parser.intValue + "CPU capacity provisioned [MHZ]" -> state.cpuCapacity = parser.doubleValue + "CPU usage [MHZ]" -> state.cpuUsage = parser.doubleValue + "CPU usage [%]" -> state.cpuUsagePct = parser.doubleValue + "Memory capacity provisioned [KB]" -> state.memCapacity = parser.doubleValue + "Memory usage [KB]" -> state.memUsage = parser.doubleValue + "Disk read throughput [KB/s]" -> state.diskRead = parser.doubleValue + "Disk write throughput [KB/s]" -> state.diskWrite = parser.doubleValue + "Network received throughput [KB/s]" -> state.netReceived = parser.doubleValue + "Network transmitted throughput [KB/s]" -> state.netTransmitted = parser.doubleValue + } + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_CAPACITY -> true + RESOURCE_STATE_CPU_USAGE -> true + RESOURCE_STATE_CPU_USAGE_PCT -> true + RESOURCE_STATE_MEM_CAPACITY -> true + RESOURCE_STATE_MEM_USAGE -> true + RESOURCE_STATE_DISK_READ -> true + RESOURCE_STATE_DISK_WRITE -> true + RESOURCE_STATE_NET_RX -> true + RESOURCE_STATE_NET_TX -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val res: Any? = when (column) { + RESOURCE_STATE_ID -> partition + RESOURCE_STATE_TIMESTAMP -> state.timestamp + RESOURCE_STATE_NCPUS -> state.cpuCores + RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity + RESOURCE_STATE_CPU_USAGE -> state.cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct + RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity + RESOURCE_STATE_MEM_USAGE -> state.memUsage + RESOURCE_STATE_DISK_READ -> state.diskRead + RESOURCE_STATE_DISK_WRITE -> state.diskWrite + RESOURCE_STATE_NET_RX -> state.netReceived + RESOURCE_STATE_NET_TX -> state.netTransmitted + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn<Int>): Int { + return when (column) { + RESOURCE_STATE_NCPUS -> state.cpuCores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn<Long>): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn<Double>): Double { + return when (column) { + RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity + RESOURCE_STATE_CPU_USAGE -> state.cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct + RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity + RESOURCE_STATE_MEM_USAGE -> state.memUsage + RESOURCE_STATE_DISK_READ -> state.diskRead + RESOURCE_STATE_DISK_WRITE -> state.diskWrite + RESOURCE_STATE_NET_RX -> state.netReceived + RESOURCE_STATE_NET_TX -> state.netTransmitted + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + parser.close() + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * The current row state. + */ + private class RowState { + var timestamp: Instant? = null + var cpuCores = -1 + var cpuCapacity = Double.NaN + var cpuUsage = Double.NaN + var cpuUsagePct = Double.NaN + var memCapacity = Double.NaN + var memUsage = Double.NaN + var diskRead = Double.NaN + var diskWrite = Double.NaN + var netReceived = Double.NaN + var netTransmitted = Double.NaN + + /** + * Reset the state. + */ + fun reset() { + timestamp = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuUsagePct = Double.NaN + memCapacity = Double.NaN + memUsage = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + netReceived = Double.NaN + netTransmitted = Double.NaN + } + } + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .setUseHeader(true) + .setColumnSeparator(';') + .build() + } +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt new file mode 100644 index 00000000..5a2d4243 --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * [Trace] implementation for the Bitbrains format. + */ +public class BitbrainsTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { + override val tables: List<String> = listOf(TABLE_RESOURCE_STATES) + + override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name + + override fun getTable(name: String): Table? { + if (!containsTable(name)) { + return null + } + + return BitbrainsResourceStateTable(factory, path) + } + + override fun toString(): String = "BitbrainsTrace[$path]" +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt new file mode 100644 index 00000000..55b11fe3 --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A format implementation for the GWF trace format. + */ +public class BitbrainsTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "bitbrains" + + /** + * The [CsvFactory] used to create the parser. + */ + private val factory = CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) + + /** + * Open a Bitbrains trace. + */ + override fun open(url: URL): BitbrainsTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return BitbrainsTrace(factory, path) + } +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat new file mode 100644 index 00000000..f18135d0 --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -0,0 +1 @@ +org.opendc.trace.bitbrains.BitbrainsTraceFormat diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt new file mode 100644 index 00000000..550805d3 --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.RESOURCE_STATE_CPU_USAGE +import org.opendc.trace.RESOURCE_STATE_TIMESTAMP +import org.opendc.trace.TABLE_RESOURCE_STATES +import java.net.URL + +/** + * Test suite for the [BitbrainsTraceFormat] class. + */ +class BitbrainsTraceFormatTest { + @Test + fun testTraceExists() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + assertDoesNotThrow { + format.open(url) + } + } + + @Test + fun testTraceDoesNotExists() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + assertThrows<IllegalArgumentException> { + format.open(URL(url.toString() + "help")) + } + } + + @Test + fun testTables() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + val trace = format.open(url) + + assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables) + } + + @Test + fun testTableExists() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + + assertNotNull(table) + assertDoesNotThrow { table!!.newReader() } + } + + @Test + fun testTableDoesNotExist() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + val trace = format.open(url) + + assertFalse(trace.containsTable("test")) + assertNull(trace.getTable("test")) + } + + @Test + fun testSmoke() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + val trace = format.open(url) + + val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, + { assertEquals(19.066, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } + ) + + reader.close() + } +} diff --git a/opendc-format/src/test/resources/bitbrains.csv b/opendc-trace/opendc-trace-bitbrains/src/test/resources/bitbrains.csv index f5e300e8..f5e300e8 100644 --- a/opendc-format/src/test/resources/bitbrains.csv +++ b/opendc-trace/opendc-trace-bitbrains/src/test/resources/bitbrains.csv diff --git a/opendc-trace/opendc-trace-gwf/build.gradle.kts b/opendc-trace/opendc-trace-gwf/build.gradle.kts new file mode 100644 index 00000000..f3dfd6ef --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support for GWF traces in OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcTrace.opendcTraceApi) + + implementation(libs.jackson.dataformat.csv) +} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt new file mode 100644 index 00000000..80a99d10 --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.net.URL + +/** + * A [Table] containing the tasks in a GWF trace. + */ +internal class GwfTaskTable(private val factory: CsvFactory, private val url: URL) : Table { + override val name: String = TABLE_TASKS + + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + TASK_WORKFLOW_ID -> true + TASK_ID -> true + TASK_SUBMIT_TIME -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_ALLOC_NCPUS -> true + TASK_PARENTS -> true + else -> false + } + } + + override fun newReader(): TableReader { + return GwfTaskTableReader(factory.createParser(url)) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Invalid partition $partition") + } + + override fun toString(): String = "GwfTaskTable" +} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt new file mode 100644 index 00000000..64b7d465 --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -0,0 +1,211 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.opendc.trace.* +import java.util.regex.Pattern + +/** + * A [TableReader] implementation for the GWF format. + */ +internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { + /** + * The current parser state. + */ + private val state = RowState() + + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + // Reset the row state + state.reset() + + if (!nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "WorkflowID" -> state.workflowId = parser.longValue + "JobID" -> state.jobId = parser.longValue + "SubmitTime" -> state.submitTime = parser.longValue + "RunTime" -> state.runtime = parser.longValue + "NProcs" -> state.nProcs = parser.intValue + "ReqNProcs" -> state.reqNProcs = parser.intValue + "Dependencies" -> parseParents(parser.valueAsString) + } + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + TASK_WORKFLOW_ID -> true + TASK_ID -> true + TASK_SUBMIT_TIME -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_ALLOC_NCPUS -> true + TASK_PARENTS -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val res: Any = when (column) { + TASK_WORKFLOW_ID -> state.workflowId + TASK_ID -> state.jobId + TASK_SUBMIT_TIME -> state.submitTime + TASK_RUNTIME -> state.runtime + TASK_REQ_NCPUS -> state.nProcs + TASK_ALLOC_NCPUS -> state.reqNProcs + TASK_PARENTS -> state.dependencies + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn<Int>): Int { + return when (column) { + TASK_REQ_NCPUS -> state.nProcs + TASK_ALLOC_NCPUS -> state.reqNProcs + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn<Long>): Long { + return when (column) { + TASK_WORKFLOW_ID -> state.workflowId + TASK_ID -> state.jobId + TASK_SUBMIT_TIME -> state.submitTime + TASK_RUNTIME -> state.runtime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDouble(column: TableColumn<Double>): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + parser.close() + } + + /** + * The pattern used to parse the parents. + */ + private val pattern = Pattern.compile("\\s+") + + /** + * Parse the parents into a set of longs. + */ + private fun parseParents(value: String): Set<Long> { + val result = mutableSetOf<Long>() + val deps = value.split(pattern) + + for (dep in deps) { + if (dep.isBlank()) { + continue + } + + result.add(dep.toLong(10)) + } + + return result + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * The current row state. + */ + private class RowState { + var workflowId = -1L + var jobId = -1L + var submitTime = -1L + var runtime = -1L + var nProcs = -1 + var reqNProcs = -1 + var dependencies = emptySet<Long>() + + /** + * Reset the state. + */ + fun reset() { + workflowId = -1 + jobId = -1 + submitTime = -1 + runtime = -1 + nProcs = -1 + reqNProcs = -1 + dependencies = emptySet() + } + } + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("WorkflowID", CsvSchema.ColumnType.NUMBER) + .addColumn("JobID", CsvSchema.ColumnType.NUMBER) + .addColumn("SubmitTime", CsvSchema.ColumnType.NUMBER) + .addColumn("RunTime", CsvSchema.ColumnType.NUMBER) + .addColumn("NProcs", CsvSchema.ColumnType.NUMBER) + .addColumn("ReqNProcs", CsvSchema.ColumnType.NUMBER) + .addColumn("Dependencies", CsvSchema.ColumnType.STRING) + .setAllowComments(true) + .setUseHeader(true) + .setColumnSeparator(',') + .build() + } +} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt new file mode 100644 index 00000000..166c1e56 --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.net.URL + +/** + * [Trace] implementation for the GWF format. + */ +public class GwfTrace internal constructor(private val factory: CsvFactory, private val url: URL) : Trace { + override val tables: List<String> = listOf(TABLE_TASKS) + + override fun containsTable(name: String): Boolean = TABLE_TASKS == name + + override fun getTable(name: String): Table? { + if (!containsTable(name)) { + return null + } + + return GwfTaskTable(factory, url) + } + + override fun toString(): String = "GwfTrace[$url]" +} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt new file mode 100644 index 00000000..6d542503 --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A [TraceFormat] implementation for the GWF trace format. + */ +public class GwfTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "gwf" + + /** + * The [CsvFactory] used to create the parser. + */ + private val factory = CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) + + /** + * Read the tasks in the GWF trace. + */ + public override fun open(url: URL): GwfTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return GwfTrace(factory, url) + } +} diff --git a/opendc-trace/opendc-trace-gwf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-gwf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat new file mode 100644 index 00000000..99a874c8 --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -0,0 +1 @@ +org.opendc.trace.gwf.GwfTraceFormat diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt new file mode 100644 index 00000000..6b0568fe --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.* +import java.net.URL + +/** + * Test suite for the [GwfTraceFormat] class. + */ +internal class GwfTraceFormatTest { + @Test + fun testTraceExists() { + val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) + val format = GwfTraceFormat() + assertDoesNotThrow { + format.open(input) + } + } + + @Test + fun testTraceDoesNotExists() { + val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) + val format = GwfTraceFormat() + assertThrows<IllegalArgumentException> { + format.open(URL(input.toString() + "help")) + } + } + + @Test + fun testTables() { + val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) + val format = GwfTraceFormat() + val trace = format.open(input) + + assertEquals(listOf(TABLE_TASKS), trace.tables) + } + + @Test + fun testTableExists() { + val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) + val format = GwfTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS) + + assertNotNull(table) + assertDoesNotThrow { table!!.newReader() } + } + + @Test + fun testTableDoesNotExist() { + val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) + val format = GwfTraceFormat() + val trace = format.open(input) + + assertFalse(trace.containsTable("test")) + assertNull(trace.getTable("test")) + } + + @Test + fun testTableReader() { + val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) + val format = GwfTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS)!! + val reader = table.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals(0L, reader.getLong(TASK_WORKFLOW_ID)) }, + { assertEquals(1L, reader.getLong(TASK_ID)) }, + { assertEquals(16, reader.getLong(TASK_SUBMIT_TIME)) }, + { assertEquals(11, reader.getLong(TASK_RUNTIME)) }, + { assertEquals(setOf<Long>(), reader.get(TASK_PARENTS)) }, + ) + } + + @Test + fun testTableReaderPartition() { + val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) + val format = GwfTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS)!! + + assertThrows<IllegalArgumentException> { table.newReader("test") } + } +} diff --git a/opendc-trace/opendc-trace-gwf/src/test/resources/trace.gwf b/opendc-trace/opendc-trace-gwf/src/test/resources/trace.gwf new file mode 100644 index 00000000..2f99616d --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/test/resources/trace.gwf @@ -0,0 +1,71 @@ +WorkflowID, JobID , SubmitTime, RunTime , NProcs , ReqNProcs , Dependencies +0 , 1 , 16 , 11 , 1 , 1 , +0 , 2 , 40 , 11 , 1 , 1 , 1 +0 , 3 , 40 , 11 , 1 , 1 , 1 +0 , 4 , 64 , 11 , 1 , 1 , 2 +0 , 5 , 63 , 11 , 1 , 1 , 3 +0 , 6 , 64 , 11 , 1 , 1 , 3 +0 , 7 , 87 , 11 , 1 , 1 , 4 5 6 +1 , 8 , 4 , 11 , 1 , 1 , +1 , 9 , 15 , 11 , 1 , 1 , 8 +1 , 10 , 15 , 11 , 1 , 1 , 8 +1 , 11 , 27 , 11 , 1 , 1 , 9 +1 , 12 , 27 , 11 , 1 , 1 , 10 +1 , 13 , 27 , 11 , 1 , 1 , 10 +1 , 14 , 38 , 11 , 1 , 1 , 12 11 13 +2 , 15 , 3 , 11 , 1 , 1 , +2 , 16 , 27 , 11 , 1 , 1 , 15 +2 , 17 , 27 , 11 , 1 , 1 , 15 +2 , 18 , 52 , 11 , 1 , 1 , 16 +2 , 19 , 51 , 11 , 1 , 1 , 17 +2 , 20 , 51 , 11 , 1 , 1 , 17 +2 , 21 , 75 , 11 , 1 , 1 , 20 18 19 +3 , 22 , 3 , 11 , 1 , 1 , +3 , 23 , 27 , 11 , 1 , 1 , 22 +3 , 24 , 27 , 11 , 1 , 1 , 22 +3 , 25 , 51 , 11 , 1 , 1 , 23 +3 , 26 , 50 , 11 , 1 , 1 , 24 +3 , 27 , 51 , 11 , 1 , 1 , 24 +3 , 28 , 75 , 11 , 1 , 1 , 25 27 26 +4 , 29 , 3 , 11 , 1 , 1 , +4 , 30 , 27 , 11 , 1 , 1 , 29 +4 , 31 , 27 , 11 , 1 , 1 , 29 +4 , 32 , 50 , 11 , 1 , 1 , 30 +4 , 33 , 50 , 11 , 1 , 1 , 31 +4 , 34 , 51 , 11 , 1 , 1 , 31 +4 , 35 , 74 , 11 , 1 , 1 , 33 32 34 +5 , 36 , 3 , 11 , 1 , 1 , +5 , 37 , 27 , 11 , 1 , 1 , 36 +5 , 38 , 26 , 11 , 1 , 1 , 36 +5 , 39 , 51 , 11 , 1 , 1 , 37 +5 , 40 , 50 , 11 , 1 , 1 , 38 +5 , 41 , 50 , 11 , 1 , 1 , 38 +5 , 42 , 74 , 11 , 1 , 1 , 39 40 41 +6 , 43 , 4 , 11 , 1 , 1 , +6 , 44 , 27 , 11 , 1 , 1 , 43 +6 , 45 , 27 , 11 , 1 , 1 , 43 +6 , 46 , 51 , 11 , 1 , 1 , 44 +6 , 47 , 51 , 11 , 1 , 1 , 45 +6 , 48 , 51 , 11 , 1 , 1 , 45 +6 , 49 , 75 , 11 , 1 , 1 , 46 47 48 +7 , 50 , 3 , 0 , 1 , 1 , +7 , 51 , 17 , 0 , 1 , 1 , 50 +7 , 52 , 17 , 0 , 1 , 1 , 50 +7 , 53 , 30 , 0 , 1 , 1 , 51 +7 , 54 , 30 , 0 , 1 , 1 , 52 +7 , 55 , 31 , 0 , 1 , 1 , 52 +7 , 56 , 44 , 0 , 1 , 1 , 55 54 53 +8 , 57 , 3 , 11 , 1 , 1 , +8 , 58 , 26 , 11 , 1 , 1 , 57 +8 , 59 , 27 , 11 , 1 , 1 , 57 +8 , 60 , 50 , 11 , 1 , 1 , 58 +8 , 61 , 51 , 11 , 1 , 1 , 59 +8 , 62 , 50 , 11 , 1 , 1 , 59 +8 , 63 , 74 , 11 , 1 , 1 , 62 61 60 +9 , 64 , 3 , 11 , 1 , 1 , +9 , 65 , 27 , 11 , 1 , 1 , 64 +9 , 66 , 27 , 11 , 1 , 1 , 64 +9 , 67 , 51 , 11 , 1 , 1 , 65 +9 , 68 , 50 , 11 , 1 , 1 , 66 +9 , 69 , 51 , 11 , 1 , 1 , 66 +9 , 70 , 74 , 11 , 1 , 1 , 68 69 67 diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts new file mode 100644 index 00000000..75378509 --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Parquet helpers for traces in OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + + /* This configuration is necessary for a slim dependency on Apache Parquet */ + api(libs.parquet) { + exclude(group = "org.apache.hadoop") + } + runtimeOnly(libs.hadoop.common) { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + exclude(group = "log4j") + exclude(group = "org.apache.hadoop") + exclude(group = "org.apache.curator") + exclude(group = "org.apache.zookeeper") + exclude(group = "org.apache.kerby") + exclude(group = "org.apache.httpcomponents") + exclude(group = "org.apache.htrace") + exclude(group = "commons-cli") + exclude(group = "javax.servlet") + exclude(group = "org.eclipse.jetty") + exclude(group = "com.sun.jersey") + exclude(group = "com.jcraft") + exclude(group = "dnsjava") + } + runtimeOnly(libs.hadoop.mapreduce.client.core) { + isTransitive = false + } + + testRuntimeOnly(libs.slf4j.simple) +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt index 92319ace..fd2e00cd 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.format.util +package org.opendc.trace.util.parquet import org.apache.parquet.io.InputFile import org.apache.parquet.io.SeekableInputStream diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt index 657bca5a..1b17ae5d 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.format.util +package org.opendc.trace.util.parquet import org.apache.parquet.io.OutputFile import org.apache.parquet.io.PositionOutputStream diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index 5083f3e1..ef9eaeb3 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.format.util +package org.opendc.trace.util.parquet import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.hadoop.ParquetReader diff --git a/opendc-format/src/test/kotlin/org/opendc/format/util/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt index e496dd96..8ef4d1fb 100644 --- a/opendc-format/src/test/kotlin/org/opendc/format/util/ParquetTest.kt +++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.format.util +package org.opendc.trace.util.parquet import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData diff --git a/opendc-trace/opendc-trace-swf/build.gradle.kts b/opendc-trace/opendc-trace-swf/build.gradle.kts new file mode 100644 index 00000000..c9eaa78d --- /dev/null +++ b/opendc-trace/opendc-trace-swf/build.gradle.kts @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support for Standard Workload Format (SWF) traces in OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcTrace.opendcTraceApi) +} diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt new file mode 100644 index 00000000..12a51a2f --- /dev/null +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.swf + +import org.opendc.trace.* +import java.nio.file.Path +import kotlin.io.path.bufferedReader + +/** + * A [Table] containing the tasks in a SWF trace. + */ +internal class SwfTaskTable(private val path: Path) : Table { + override val name: String = TABLE_TASKS + + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + TASK_ID -> true + TASK_SUBMIT_TIME -> true + TASK_WAIT_TIME -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_ALLOC_NCPUS -> true + TASK_PARENTS -> true + TASK_STATUS -> true + TASK_GROUP_ID -> true + TASK_USER_ID -> true + else -> false + } + } + + override fun newReader(): TableReader { + val reader = path.bufferedReader() + return SwfTaskTableReader(reader) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Invalid partition $partition") + } + + override fun toString(): String = "SwfTaskTable" +} diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt new file mode 100644 index 00000000..5f879a54 --- /dev/null +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.swf + +import org.opendc.trace.* +import java.io.BufferedReader + +/** + * A [TableReader] implementation for the SWF format. + */ +internal class SwfTaskTableReader(private val reader: BufferedReader) : TableReader { + /** + * The current row. + */ + private var fields = emptyList<String>() + + /** + * A [Regex] object to match whitespace. + */ + private val whitespace = "\\s+".toRegex() + + override fun nextRow(): Boolean { + var line: String + var num = 0 + + while (true) { + line = reader.readLine() ?: return false + num++ + + if (line.isBlank()) { + // Ignore empty lines + continue + } else if (line.startsWith(";")) { + // Ignore comments for now + continue + } + + break + } + + fields = line.trim().split(whitespace) + + if (fields.size < 18) { + throw IllegalArgumentException("Invalid format at line $line") + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + TASK_ID -> true + TASK_SUBMIT_TIME -> true + TASK_WAIT_TIME -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_ALLOC_NCPUS -> true + TASK_PARENTS -> true + TASK_STATUS -> true + TASK_GROUP_ID -> true + TASK_USER_ID -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val res: Any = when (column) { + TASK_ID -> getLong(TASK_ID) + TASK_SUBMIT_TIME -> getLong(TASK_SUBMIT_TIME) + TASK_WAIT_TIME -> getLong(TASK_WAIT_TIME) + TASK_RUNTIME -> getLong(TASK_RUNTIME) + TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) + TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS) + TASK_PARENTS -> { + val parent = fields[COL_PARENT_JOB].toLong(10) + if (parent < 0) emptySet() else setOf(parent) + } + TASK_STATUS -> getInt(TASK_STATUS) + TASK_GROUP_ID -> getInt(TASK_GROUP_ID) + TASK_USER_ID -> getInt(TASK_USER_ID) + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn<Int>): Int { + return when (column) { + TASK_REQ_NCPUS -> fields[COL_REQ_NCPUS].toInt(10) + TASK_ALLOC_NCPUS -> fields[COL_ALLOC_NCPUS].toInt(10) + TASK_STATUS -> fields[COL_STATUS].toInt(10) + TASK_GROUP_ID -> fields[COL_GROUP_ID].toInt(10) + TASK_USER_ID -> fields[COL_USER_ID].toInt(10) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn<Long>): Long { + return when (column) { + TASK_ID -> fields[COL_JOB_ID].toLong(10) + TASK_SUBMIT_TIME -> fields[COL_SUBMIT_TIME].toLong(10) + TASK_WAIT_TIME -> fields[COL_WAIT_TIME].toLong(10) + TASK_RUNTIME -> fields[COL_RUN_TIME].toLong(10) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDouble(column: TableColumn<Double>): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reader.close() + } + + /** + * Default column indices for the SWF format. + */ + private val COL_JOB_ID = 0 + private val COL_SUBMIT_TIME = 1 + private val COL_WAIT_TIME = 2 + private val COL_RUN_TIME = 3 + private val COL_ALLOC_NCPUS = 4 + private val COL_AVG_CPU_TIME = 5 + private val COL_USED_MEM = 6 + private val COL_REQ_NCPUS = 7 + private val COL_REQ_TIME = 8 + private val COL_REQ_MEM = 9 + private val COL_STATUS = 10 + private val COL_USER_ID = 11 + private val COL_GROUP_ID = 12 + private val COL_EXEC_NUM = 13 + private val COL_QUEUE_NUM = 14 + private val COL_PART_NUM = 15 + private val COL_PARENT_JOB = 16 + private val COL_PARENT_THINK_TIME = 17 +} diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt new file mode 100644 index 00000000..d4da735e --- /dev/null +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.swf + +import org.opendc.trace.TABLE_TASKS +import org.opendc.trace.Table +import org.opendc.trace.Trace +import java.nio.file.Path + +/** + * [Trace] implementation for the SWF format. + */ +public class SwfTrace internal constructor(private val path: Path) : Trace { + override val tables: List<String> = listOf(TABLE_TASKS) + + override fun containsTable(name: String): Boolean = TABLE_TASKS == name + + override fun getTable(name: String): Table? { + if (!containsTable(name)) { + return null + } + return SwfTaskTable(path) + } + + override fun toString(): String = "SwfTrace[$path]" +} diff --git a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index 31ae03e0..36c3122e 100644 --- a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -20,28 +20,24 @@ * SOFTWARE. */ -package org.opendc.format.trace.wtf +package org.opendc.trace.swf -import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import java.io.File +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists /** - * Test suite for the [WtfTraceReader] class. + * Support for the Standard Workload Format (SWF) in OpenDC. + * + * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html */ -class WtfTraceReaderTest { - /** - * Smoke test for parsing WTF traces. - */ - @Test - fun testParseWtf() { - val reader = WtfTraceReader(File("src/test/resources/wtf-trace")) - var entry = reader.next() - assertEquals(0, entry.start) - assertEquals(23, entry.workload.tasks.size) +public class SwfTraceFormat : TraceFormat { + override val name: String = "swf" - entry = reader.next() - assertEquals(333387, entry.start) - assertEquals(23, entry.workload.tasks.size) + override fun open(url: URL): SwfTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return SwfTrace(path) } } diff --git a/opendc-trace/opendc-trace-swf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-swf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat new file mode 100644 index 00000000..6c6b0eb2 --- /dev/null +++ b/opendc-trace/opendc-trace-swf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -0,0 +1 @@ +org.opendc.trace.swf.SwfTraceFormat diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt new file mode 100644 index 00000000..9686891b --- /dev/null +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.swf + +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.* +import org.opendc.trace.TABLE_TASKS +import org.opendc.trace.TASK_ALLOC_NCPUS +import org.opendc.trace.TASK_ID +import java.net.URL + +/** + * Test suite for the [SwfTraceFormat] class. + */ +internal class SwfTraceFormatTest { + @Test + fun testTraceExists() { + val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) + val format = SwfTraceFormat() + assertDoesNotThrow { + format.open(input) + } + } + + @Test + fun testTraceDoesNotExists() { + val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) + val format = SwfTraceFormat() + assertThrows<IllegalArgumentException> { + format.open(URL(input.toString() + "help")) + } + } + + @Test + fun testTables() { + val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) + val trace = SwfTraceFormat().open(input) + + assertEquals(listOf(TABLE_TASKS), trace.tables) + } + + @Test + fun testTableExists() { + val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) + val table = SwfTraceFormat().open(input).getTable(TABLE_TASKS) + + assertNotNull(table) + assertDoesNotThrow { table!!.newReader() } + } + + @Test + fun testTableDoesNotExist() { + val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) + val trace = SwfTraceFormat().open(input) + + assertFalse(trace.containsTable("test")) + assertNull(trace.getTable("test")) + } + + @Test + fun testReader() { + val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) + val trace = SwfTraceFormat().open(input) + val reader = trace.getTable(TABLE_TASKS)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals(1, reader.getLong(TASK_ID)) }, + { assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) }, + { assertTrue(reader.nextRow()) }, + { assertEquals(2, reader.getLong(TASK_ID)) }, + { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) }, + ) + + reader.close() + } + + @Test + fun testReaderPartition() { + val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) + val trace = SwfTraceFormat().open(input) + + assertThrows<IllegalArgumentException> { + trace.getTable(TABLE_TASKS)!!.newReader("test") + } + } +} diff --git a/opendc-format/src/test/resources/swf_trace.txt b/opendc-trace/opendc-trace-swf/src/test/resources/trace.swf index c3ecf890..c3ecf890 100644 --- a/opendc-format/src/test/resources/swf_trace.txt +++ b/opendc-trace/opendc-trace-swf/src/test/resources/trace.swf diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts new file mode 100644 index 00000000..5051c7b0 --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support for Workflow Trace Format (WTF) traces in OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcTrace.opendcTraceApi) + + implementation(projects.opendcTrace.opendcTraceParquet) +} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt new file mode 100644 index 00000000..be26f540 --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf + +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.util.parquet.LocalParquetReader +import java.nio.file.Path + +/** + * A [Table] containing the tasks in a GWF trace. + */ +internal class WtfTaskTable(private val path: Path) : Table { + override val name: String = TABLE_TASKS + + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + TASK_ID -> true + TASK_WORKFLOW_ID -> true + TASK_SUBMIT_TIME -> true + TASK_WAIT_TIME -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_PARENTS -> true + TASK_CHILDREN -> true + TASK_GROUP_ID -> true + TASK_USER_ID -> true + else -> false + } + } + + override fun newReader(): TableReader { + val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")) + return WtfTaskTableReader(reader) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Invalid partition $partition") + } + + override fun toString(): String = "WtfTaskTable" +} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt new file mode 100644 index 00000000..b6789542 --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf + +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.util.parquet.LocalParquetReader + +/** + * A [TableReader] implementation for the WTF format. + */ +internal class WtfTaskTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { + /** + * The current record. + */ + private var record: GenericRecord? = null + + override fun nextRow(): Boolean { + record = reader.read() + return record != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + TASK_ID -> true + TASK_WORKFLOW_ID -> true + TASK_SUBMIT_TIME -> true + TASK_WAIT_TIME -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_PARENTS -> true + TASK_CHILDREN -> true + TASK_GROUP_ID -> true + TASK_USER_ID -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val record = checkNotNull(record) { "Reader in invalid state" } + + @Suppress("UNCHECKED_CAST") + val res: Any = when (column) { + TASK_ID -> record["id"] + TASK_WORKFLOW_ID -> record["workflow_id"] + TASK_SUBMIT_TIME -> record["ts_submit"] + TASK_WAIT_TIME -> record["wait_time"] + TASK_RUNTIME -> record["runtime"] + TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt() + TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet() + TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet() + TASK_GROUP_ID -> record["group_id"] + TASK_USER_ID -> record["user_id"] + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn<Int>): Int { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (column) { + TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt() + TASK_GROUP_ID -> record["group_id"] as Int + TASK_USER_ID -> record["user_id"] as Int + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn<Long>): Long { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (column) { + TASK_ID -> record["id"] as Long + TASK_WORKFLOW_ID -> record["workflow_id"] as Long + TASK_SUBMIT_TIME -> record["ts_submit"] as Long + TASK_WAIT_TIME -> record["wait_time"] as Long + TASK_RUNTIME -> record["runtime"] as Long + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDouble(column: TableColumn<Double>): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reader.close() + } +} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt new file mode 100644 index 00000000..7eff0f5a --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf + +import org.opendc.trace.TABLE_TASKS +import org.opendc.trace.Table +import org.opendc.trace.Trace +import java.nio.file.Path + +/** + * [Trace] implementation for the WTF format. + */ +public class WtfTrace internal constructor(private val path: Path) : Trace { + override val tables: List<String> = listOf(TABLE_TASKS) + + override fun containsTable(name: String): Boolean = TABLE_TASKS == name + + override fun getTable(name: String): Table? { + if (!containsTable(name)) { + return null + } + + return WtfTaskTable(path) + } + + override fun toString(): String = "SwfTrace[$path]" +} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt new file mode 100644 index 00000000..781cb335 --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf + +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A [TraceFormat] implementation for the Workflow Trace Format (WTF). + */ +public class WtfTraceFormat : TraceFormat { + override val name: String = "wtf" + + override fun open(url: URL): WtfTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return WtfTrace(path) + } +} diff --git a/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat new file mode 100644 index 00000000..32da52ff --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -0,0 +1 @@ +org.opendc.trace.wtf.WtfTraceFormat diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt new file mode 100644 index 00000000..a05a523e --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.* +import java.io.File +import java.net.URL + +/** + * Test suite for the [WtfTraceFormat] class. + */ +class WtfTraceFormatTest { + @Test + fun testTraceExists() { + val input = File("src/test/resources/wtf-trace").toURI().toURL() + val format = WtfTraceFormat() + org.junit.jupiter.api.assertDoesNotThrow { + format.open(input) + } + } + + @Test + fun testTraceDoesNotExists() { + val input = File("src/test/resources/wtf-trace").toURI().toURL() + val format = WtfTraceFormat() + assertThrows<IllegalArgumentException> { + format.open(URL(input.toString() + "help")) + } + } + + @Test + fun testTables() { + val input = File("src/test/resources/wtf-trace").toURI().toURL() + val format = WtfTraceFormat() + val trace = format.open(input) + + assertEquals(listOf(TABLE_TASKS), trace.tables) + } + + @Test + fun testTableExists() { + val input = File("src/test/resources/wtf-trace").toURI().toURL() + val format = WtfTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS) + + assertNotNull(table) + org.junit.jupiter.api.assertDoesNotThrow { table!!.newReader() } + } + + @Test + fun testTableDoesNotExist() { + val input = File("src/test/resources/wtf-trace").toURI().toURL() + val format = WtfTraceFormat() + val trace = format.open(input) + + assertFalse(trace.containsTable("test")) + assertNull(trace.getTable("test")) + } + + /** + * Smoke test for parsing WTF traces. + */ + @Test + fun testTableReader() { + val input = File("src/test/resources/wtf-trace") + val trace = WtfTraceFormat().open(input.toURI().toURL()) + val reader = trace.getTable(TABLE_TASKS)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals(362334516345962206, reader.getLong(TASK_ID)) }, + { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) }, + { assertEquals(245604, reader.getLong(TASK_SUBMIT_TIME)) }, + { assertEquals(8163, reader.getLong(TASK_RUNTIME)) }, + { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) }, + ) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals(502010169100446658, reader.getLong(TASK_ID)) }, + { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) }, + { assertEquals(251325, reader.getLong(TASK_SUBMIT_TIME)) }, + { assertEquals(8216, reader.getLong(TASK_RUNTIME)) }, + { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) }, + ) + + reader.close() + } + + @Test + fun testTableReaderPartition() { + val input = File("src/test/resources/wtf-trace").toURI().toURL() + val format = WtfTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS)!! + + assertThrows<IllegalArgumentException> { table.newReader("test") } + } +} diff --git a/opendc-format/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet b/opendc-trace/opendc-trace-wtf/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet Binary files differindex d2044038..d2044038 100644 --- a/opendc-format/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet +++ b/opendc-trace/opendc-trace-wtf/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts index 1f705b79..ec4a4673 100644 --- a/opendc-web/opendc-web-runner/build.gradle.kts +++ b/opendc-web/opendc-web-runner/build.gradle.kts @@ -36,7 +36,6 @@ application { dependencies { api(platform(projects.opendcPlatform)) implementation(projects.opendcCompute.opendcComputeSimulator) - implementation(projects.opendcFormat) implementation(projects.opendcExperiments.opendcExperimentsCapelin) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcTelemetry.opendcTelemetrySdk) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index c5f5cd03..53d50357 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -34,12 +34,12 @@ import kotlinx.coroutines.channels.Channel import mu.KotlinLogging import org.opendc.compute.service.scheduler.weights.* import org.opendc.experiments.capelin.* +import org.opendc.experiments.capelin.env.EnvironmentReader +import org.opendc.experiments.capelin.env.MachineDef import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index bc082dbc..941202d2 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -39,11 +39,7 @@ dependencies { testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcCompute.opendcComputeSimulator) - testImplementation(projects.opendcFormat) + testImplementation(projects.opendcTrace.opendcTraceGwf) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) - testImplementation(libs.jackson.module.kotlin) { - exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") - } - testImplementation(kotlin("reflect")) testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt new file mode 100644 index 00000000..a390fe08 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.trace.* +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.time.Clock +import java.util.* +import kotlin.collections.HashMap +import kotlin.collections.HashSet +import kotlin.math.max +import kotlin.math.min + +/** + * Helper tool to replay workflow trace. + */ +internal class TraceReplayer(private val trace: Trace) { + /** + * Replay the workload. + */ + public suspend fun replay(clock: Clock, service: WorkflowService) { + val jobs = parseTrace(trace) + + // Sort jobs by their arrival time + (jobs as MutableList<Job>).sortBy { it.metadata["WORKFLOW_SUBMIT_TIME"] as Long } + + // Wait until the trace is started + val startTime = jobs[0].metadata["WORKFLOW_SUBMIT_TIME"] as Long + delay(min(0L, startTime - clock.millis())) + + val offset = startTime - clock.millis() + + coroutineScope { + for (job in jobs) { + val submitTime = job.metadata["WORKFLOW_SUBMIT_TIME"] as Long + delay(max(0, (submitTime - offset) - clock.millis())) + + launch { service.run(job) } + } + } + } + + /** + * Convert [trace] into a list of [Job]s that can be submitted to the workflow service. + */ + public fun parseTrace(trace: Trace): List<Job> { + val table = checkNotNull(trace.getTable(TABLE_TASKS)) + val reader = table.newReader() + + val jobs = mutableMapOf<Long, Job>() + val tasks = mutableMapOf<Long, Task>() + val taskDependencies = mutableMapOf<Task, Set<Long>>() + + try { + while (reader.nextRow()) { + // Bag of tasks without workflow ID all share the same workflow + val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.getLong(TASK_WORKFLOW_ID) else 0L + val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } + + val id = reader.getLong(TASK_ID) + val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS)) + reader.getInt(TASK_ALLOC_NCPUS) + else + reader.getInt(TASK_REQ_NCPUS) + val submitTime = reader.getLong(TASK_SUBMIT_TIME) + val runtime = reader.getLong(TASK_RUNTIME) + val flops: Long = 4000 * runtime * grantedCpus + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, id), + "<unnamed>", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to grantedCpus, + WORKFLOW_TASK_DEADLINE to (runtime * 1000) + ), + ) + + tasks[id] = task + taskDependencies[task] = reader.get(TASK_PARENTS) + + (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime) { a, b -> min(a as Long, b as Long) } + (workflow.tasks as MutableSet<Task>).add(task) + } + + // Resolve dependencies for all tasks + for ((task, deps) in taskDependencies) { + for (dep in deps) { + val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" } + (task.dependencies as MutableSet<Task>).add(parent) + } + } + } finally { + reader.close() + } + + return jobs.values.toList() + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index d82959e7..07433d1f 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -25,9 +25,6 @@ package org.opendc.workflow.service import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test @@ -39,25 +36,28 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.simulator.SimHost -import org.opendc.format.environment.sc18.Sc18EnvironmentReader -import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.toOtelClock +import org.opendc.trace.gwf.GwfTraceFormat import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy -import kotlin.math.max +import java.util.* /** * Integration test suite for the [WorkflowServiceImpl]. */ @DisplayName("WorkflowService") -internal class WorkflowServiceIntegrationTest { +internal class WorkflowServiceTest { /** * A large integration test where we check whether all tasks in some trace are executed correctly. */ @@ -69,20 +69,20 @@ internal class WorkflowServiceIntegrationTest { .build() val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = Sc18EnvironmentReader(checkNotNull(object {}.javaClass.getResourceAsStream("/environment.json"))) - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - coroutineContext, - interpreter, - MeterProvider.noop().get("opendc-compute-simulator"), - SimSpaceSharedHypervisorProvider() - ) - } + val machineModel = createMachineModel() + val hvProvider = SimSpaceSharedHypervisorProvider() + val hosts = List(4) { id -> + SimHost( + UUID(0, id.toLong()), + "node-$id", + machineModel, + emptyMap(), + coroutineContext, + interpreter, + meterProvider.get("opendc-compute-simulator"), + hvProvider, + ) + } val meter = MeterProvider.noop().get("opendc-compute") val computeScheduler = FilterScheduler( @@ -105,23 +105,10 @@ internal class WorkflowServiceIntegrationTest { taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), ) - val reader = GwfTraceReader(checkNotNull(object {}.javaClass.getResourceAsStream("/trace.gwf"))) - var offset = Long.MIN_VALUE - - coroutineScope { - while (reader.hasNext()) { - val entry = reader.next() + val trace = GwfTraceFormat().open(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf"))) + val replayer = TraceReplayer(trace) - if (offset < 0) { - offset = entry.start - clock.millis() - } - - delay(max(0, (entry.start - offset) - clock.millis())) - launch { - scheduler.run(entry.workload) - } - } - } + replayer.replay(clock, scheduler) hosts.forEach(SimHost::close) scheduler.close() @@ -134,10 +121,22 @@ internal class WorkflowServiceIntegrationTest { { assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") }, { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, - { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") } + { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, + { assertEquals(33213237L, clock.millis()) } ) } + /** + * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html + */ + private fun createMachineModel(): MachineModel { + val node = ProcessingNode("AMD", "am64", "EPYC 7742", 32) + val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) } + val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) } + + return MachineModel(cpus, memory) + } + class WorkflowMetrics { var jobsSubmitted = 0L var jobsActive = 0L diff --git a/settings.gradle.kts b/settings.gradle.kts index d85c08f8..5cae0a31 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -30,7 +30,6 @@ include(":opendc-workflow:opendc-workflow-service") include(":opendc-faas:opendc-faas-api") include(":opendc-faas:opendc-faas-service") include(":opendc-faas:opendc-faas-simulator") -include(":opendc-format") include(":opendc-experiments:opendc-experiments-capelin") include(":opendc-experiments:opendc-experiments-energy21") include(":opendc-experiments:opendc-experiments-serverless20") @@ -46,6 +45,12 @@ include(":opendc-simulator:opendc-simulator-compute") include(":opendc-simulator:opendc-simulator-failures") include(":opendc-telemetry:opendc-telemetry-api") include(":opendc-telemetry:opendc-telemetry-sdk") +include(":opendc-trace:opendc-trace-api") +include(":opendc-trace:opendc-trace-gwf") +include(":opendc-trace:opendc-trace-swf") +include(":opendc-trace:opendc-trace-wtf") +include(":opendc-trace:opendc-trace-bitbrains") +include(":opendc-trace:opendc-trace-parquet") include(":opendc-harness:opendc-harness-api") include(":opendc-harness:opendc-harness-engine") include(":opendc-harness:opendc-harness-cli") |
