diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-12 12:08:55 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-12 12:08:55 +0200 |
| commit | 2cd3bd18e548a72d64afe0e7f59487f4747d722f (patch) | |
| tree | dc9e2fba5ca4d19a90934a8b68dbb8110ee34bb7 | |
| parent | cae193284570d6ee9dbacdde57b3e4e367aa9d9f (diff) | |
| parent | 992b65396f55c0e12b36823d191dea8e03dd45ba (diff) | |
merge: Add support for new trace formats
This pull request updates the trace API with the addition of several new trace formats.
- Add support for Materna traces from GWA
- Keep reader state in own class
- Parse last column in Solvinity trace format
- Add support Azure VM traces
- Add support for WfCommons (WorkflowHub) traces
- Add API for accessing available table columns
- Add synthetic resource table for Bitbrains format
- Support dynamic resolving of trace formats
**Breaking API Changes**
- Replace `isSupported` by a list of `TableColumns`
47 files changed, 3667 insertions, 705 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 4e2fc777..ddede2e8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -52,6 +52,7 @@ clikt = { module = "com.github.ajalt.clikt:clikt", version.ref = "clikt" } progressbar = { module = "me.tongfei:progressbar", version.ref = "progressbar" } # Format +jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref = "jackson" } jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" } jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" } jackson-dataformat-csv = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-csv", version.ref = "jackson" } diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 036d0638..7dadd14d 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -45,6 +45,7 @@ dependencies { implementation(libs.progressbar) implementation(libs.clikt) implementation(libs.jackson.module.kotlin) + implementation(libs.jackson.dataformat.csv) implementation(kotlin("reflect")) implementation(libs.parquet) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt index fa4e9ed8..ca937328 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -90,9 +90,9 @@ class RawParquetTraceReader(private val path: File) { } val submissionTime = reader.get(RESOURCE_START_TIME) - val endTime = reader.get(RESOURCE_END_TIME) + val endTime = reader.get(RESOURCE_STOP_TIME) val maxCores = reader.getInt(RESOURCE_NCPUS) - val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) + val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) val vmFragments = fragments.getValue(id).asSequence() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index a021de8d..1f3878eb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -25,80 +25,74 @@ package org.opendc.experiments.capelin.trace import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.arguments.argument import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.groupChoice +import com.github.ajalt.clikt.parameters.groups.cooccurring import com.github.ajalt.clikt.parameters.options.* -import com.github.ajalt.clikt.parameters.types.file -import com.github.ajalt.clikt.parameters.types.long -import me.tongfei.progressbar.ProgressBar -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder +import com.github.ajalt.clikt.parameters.types.* +import mu.KotlinLogging import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.experiments.capelin.trace.azure.AzureTraceFormat +import org.opendc.experiments.capelin.trace.bp.BP_RESOURCES_SCHEMA +import org.opendc.experiments.capelin.trace.bp.BP_RESOURCE_STATES_SCHEMA import org.opendc.experiments.capelin.trace.sv.SvTraceFormat import org.opendc.trace.* import org.opendc.trace.bitbrains.BitbrainsTraceFormat -import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.parquet.LocalOutputFile -import java.io.BufferedReader import java.io.File -import java.io.FileReader import java.util.* import kotlin.math.max import kotlin.math.min +import kotlin.math.roundToLong + +/** + * A script to convert a trace in text format into a Parquet trace. + */ +fun main(args: Array<String>): Unit = TraceConverterCli().main(args) /** * Represents the command for converting traces */ class TraceConverterCli : CliktCommand(name = "trace-converter") { /** + * The logger instance for the converter. + */ + private val logger = KotlinLogging.logger {} + + /** * The directory where the trace should be stored. */ - private val outputPath by option("-O", "--output", help = "path to store the trace") + private val output by option("-O", "--output", help = "path to store the trace") .file(canBeFile = false, mustExist = false) .defaultLazy { File("output") } /** * The directory where the input trace is located. */ - private val inputPath by argument("input", help = "path to the input trace") + private val input by argument("input", help = "path to the input trace") .file(canBeFile = false) /** - * The input type of the trace. + * The input format of the trace. */ - private val type by option("-t", "--type", help = "input type of trace").groupChoice( - "solvinity" to SolvinityConversion(), - "bitbrains" to BitbrainsConversion(), - "azure" to AzureConversion() - ) + private val format by option("-f", "--format", help = "input format of trace") + .choice( + "solvinity" to SvTraceFormat(), + "bitbrains" to BitbrainsTraceFormat(), + "azure" to AzureTraceFormat() + ) + .required() + + /** + * The sampling options. + */ + private val samplingOptions by SamplingOptions().cooccurring() override fun run() { - val metaSchema = SchemaBuilder - .record("meta") - .namespace("org.opendc.format.sc20") - .fields() - .name("id").type().stringType().noDefault() - .name("submissionTime").type().longType().noDefault() - .name("endTime").type().longType().noDefault() - .name("maxCores").type().intType().noDefault() - .name("requiredMemory").type().longType().noDefault() - .endRecord() - val schema = SchemaBuilder - .record("trace") - .namespace("org.opendc.format.sc20") - .fields() - .name("id").type().stringType().noDefault() - .name("time").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("cores").type().intType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("flops").type().longType().noDefault() - .endRecord() - - val metaParquet = File(outputPath, "meta.parquet") - val traceParquet = File(outputPath, "trace.parquet") + val metaParquet = File(output, "meta.parquet") + val traceParquet = File(output, "trace.parquet") if (metaParquet.exists()) { metaParquet.delete() @@ -107,324 +101,160 @@ class TraceConverterCli : CliktCommand(name = "trace-converter") { traceParquet.delete() } + val trace = format.open(input.toURI().toURL()) + + logger.info { "Building resources table" } + val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet)) - .withSchema(metaSchema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .withSchema(BP_RESOURCES_SCHEMA) + .withCompressionCodec(CompressionCodecName.ZSTD) + .enablePageWriteChecksum() .build() + val selectedVms = metaWriter.use { convertResources(trace, it) } + + logger.info { "Wrote ${selectedVms.size} rows" } + logger.info { "Building resource states table" } + val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .withSchema(BP_RESOURCE_STATES_SCHEMA) + .withCompressionCodec(CompressionCodecName.ZSTD) + .enableDictionaryEncoding() + .enablePageWriteChecksum() + .withBloomFilterEnabled("id", true) + .withBloomFilterNDV("id", selectedVms.size.toLong()) .build() - try { - val type = type ?: throw IllegalArgumentException("Invalid trace conversion") - val allFragments = type.read(inputPath, metaSchema, metaWriter) - allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id }) - - for (fragment in allFragments) { - val record = GenericData.Record(schema) - record.put("id", fragment.id) - record.put("time", fragment.tick) - record.put("duration", fragment.duration) - record.put("cores", fragment.cores) - record.put("cpuUsage", fragment.usage) - record.put("flops", fragment.flops) - - writer.write(record) - } - } finally { - writer.close() - metaWriter.close() - } + val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) } + logger.info { "Wrote $statesCount rows" } } -} -/** - * The supported trace conversions. - */ -sealed class TraceConversion(name: String) : OptionGroup(name) { /** - * Read the fragments of the trace. + * Convert the resources table for the trace. */ - abstract fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter<GenericData.Record> - ): MutableList<Fragment> -} - -/** - * A [TraceConversion] that uses the Trace API to perform the conversion. - */ -abstract class AbstractConversion(name: String) : TraceConversion(name) { - abstract val format: TraceFormat - - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter<GenericData.Record> - ): MutableList<Fragment> { - val fragments = mutableListOf<Fragment>() - val trace = format.open(traceDirectory.toURI().toURL()) + private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> { + val random = samplingOptions?.let { Random(it.seed) } + val samplingFraction = samplingOptions?.fraction ?: 1.0 val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - var lastId: String? = null - var maxCores = Int.MIN_VALUE - var requiredMemory = Long.MIN_VALUE - var minTime = Long.MAX_VALUE - var maxTime = Long.MIN_VALUE - var lastTimestamp = Long.MIN_VALUE - - while (reader.nextRow()) { - val id = reader.get(RESOURCE_STATE_ID) - - if (lastId != null && lastId != id) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", lastId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) - } - lastId = id + var hasNextRow = reader.nextRow() + val selectedVms = mutableSetOf<String>() - val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP) - val timestampMs = timestamp.toEpochMilli() - val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) - val cores = reader.getInt(RESOURCE_STATE_NCPUS) - val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY) + while (hasNextRow) { + var id: String + var numCpus = Int.MIN_VALUE + var memCapacity = Double.MIN_VALUE + var memUsage = Double.MIN_VALUE + var startTime = Long.MAX_VALUE + var stopTime = Long.MIN_VALUE - maxCores = max(maxCores, cores) - requiredMemory = max(requiredMemory, (memCapacity / 1000).toLong()) + do { + id = reader.get(RESOURCE_STATE_ID) - if (lastTimestamp < 0) { - lastTimestamp = timestampMs - 5 * 60 * 1000L - minTime = min(minTime, lastTimestamp) - } + val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() + startTime = min(startTime, timestamp) + stopTime = max(stopTime, timestamp) + + numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS)) - if (fragments.isEmpty()) { - val duration = 5 * 60 * 1000L - val flops: Long = (cpuUsage * duration / 1000).toLong() - fragments.add(Fragment(id, lastTimestamp, flops, duration, cpuUsage, cores)) - } else { - val last = fragments.last() - val duration = timestampMs - lastTimestamp - val flops: Long = (cpuUsage * duration / 1000).toLong() - - // Perform run-length encoding - if (last.id == id && (duration == 0L || last.usage == cpuUsage)) { - fragments[fragments.size - 1] = last.copy(duration = last.duration + duration) - } else { - fragments.add( - Fragment( - id, - lastTimestamp, - flops, - duration, - cpuUsage, - cores - ) - ) + memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)) + if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) { + memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE)) } + + hasNextRow = reader.nextRow() + } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) + + // Sample only a fraction of the VMs + if (random != null && random.nextDouble() > samplingFraction) { + continue } - val last = fragments.last() - maxTime = max(maxTime, last.tick + last.duration) - lastTimestamp = timestampMs + val builder = GenericRecordBuilder(BP_RESOURCES_SCHEMA) + + builder["id"] = id + builder["submissionTime"] = startTime + builder["endTime"] = stopTime + builder["maxCores"] = numCpus + builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong() + + logger.info { "Selecting VM $id" } + + writer.write(builder.build()) + selectedVms.add(id) } - return fragments + + return selectedVms } -} -class SolvinityConversion : AbstractConversion("Solvinity") { - override val format: TraceFormat = SvTraceFormat() -} + /** + * Convert the resource states table for the trace. + */ + private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() -/** - * Conversion of the Bitbrains public trace. - */ -class BitbrainsConversion : AbstractConversion("Bitbrains") { - override val format: TraceFormat = BitbrainsTraceFormat() -} + var hasNextRow = reader.nextRow() + var count = 0 -/** - * Conversion of the Azure public VM trace. - */ -class AzureConversion : TraceConversion("Azure") { - private val seed by option(help = "seed for trace sampling") - .long() - .default(0) - - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter<GenericData.Record> - ): MutableList<Fragment> { - val random = Random(seed) - val fraction = 0.01 - - // Read VM table - val vmIdTableCol = 0 - val coreTableCol = 9 - val provisionedMemoryTableCol = 10 - - var vmId: String - var cores: Int - var requiredMemory: Long - - val vmIds = mutableSetOf<String>() - val vmIdToMetadata = mutableMapOf<String, VmInfo>() - - BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader -> - reader.lineSequence() - .chunked(1024) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - // Sample only a fraction of the VMs - if (random.nextDouble() > fraction) { - continue - } - - val values = line.split(",") - - // Exclude VMs with a large number of cores (not specified exactly) - if (values[coreTableCol].contains(">")) { - continue - } - - vmId = values[vmIdTableCol].trim() - cores = values[coreTableCol].trim().toInt() - requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB - - vmIds.add(vmId) - vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L) - } + while (hasNextRow) { + var lastTimestamp = Long.MIN_VALUE + + do { + val id = reader.get(RESOURCE_STATE_ID) + + if (id !in selectedVms) { + hasNextRow = reader.nextRow() + continue } - } - // Read VM metric reading files - val timestampCol = 0 - val vmIdCol = 1 - val cpuUsageCol = 4 - val traceInterval = 5 * 60 * 1000L - - val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>() - val vmIdToLastFragment = mutableMapOf<String, Fragment?>() - val allFragments = mutableListOf<Fragment>() - - for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) { - val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") - var timestamp: Long - var cpuUsage: Double - - BufferedReader(FileReader(readingsFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(",") - vmId = values[vmIdCol].trim() - - // Ignore readings for VMs not in the sample - if (!vmIds.contains(vmId)) { - continue - } - - timestamp = values[timestampCol].trim().toLong() * 1000L - vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) - cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz - vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) - - val flops: Long = (cpuUsage * 5 * 60).toLong() - val lastFragment = vmIdToLastFragment[vmId] - - vmIdToLastFragment[vmId] = - if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { - Fragment( - vmId, - lastFragment.tick, - lastFragment.flops + flops, - lastFragment.duration + traceInterval, - cpuUsage, - vmIdToMetadata[vmId]!!.cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - vmIdToMetadata[vmId]!!.cores - ) - if (lastFragment != null) { - if (vmIdToFragments[vmId] == null) { - vmIdToFragments[vmId] = mutableListOf() - } - vmIdToFragments[vmId]!!.add(lastFragment) - allFragments.add(lastFragment) - } - fragment - } - } - } - } - } + val builder = GenericRecordBuilder(BP_RESOURCE_STATES_SCHEMA) + builder["id"] = id - for (entry in vmIdToLastFragment) { - if (entry.value != null) { - if (vmIdToFragments[entry.key] == null) { - vmIdToFragments[entry.key] = mutableListOf() + val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() + if (lastTimestamp < 0) { + lastTimestamp = timestamp - 5 * 60 * 1000L } - vmIdToFragments[entry.key]!!.add(entry.value!!) - } - } - println("Read ${vmIdToLastFragment.size} VMs") - - for (entry in vmIdToMetadata) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", entry.key) - metaRecord.put("submissionTime", entry.value.minTime) - metaRecord.put("endTime", entry.value.maxTime) - println("${entry.value.minTime} - ${entry.value.maxTime}") - metaRecord.put("maxCores", entry.value.cores) - metaRecord.put("requiredMemory", entry.value.requiredMemory) - metaWriter.write(metaRecord) - } + val duration = timestamp - lastTimestamp + val cores = reader.getInt(RESOURCE_STATE_NCPUS) + val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) + val flops = (cpuUsage * duration / 1000.0).roundToLong() - return allFragments - } -} + builder["time"] = timestamp + builder["duration"] = duration + builder["cores"] = cores + builder["cpuUsage"] = cpuUsage + builder["flops"] = flops -data class Fragment( - val id: String, - val tick: Long, - val flops: Long, - val duration: Long, - val usage: Double, - val cores: Int -) + writer.write(builder.build()) -class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long) + lastTimestamp = timestamp + hasNextRow = reader.nextRow() + } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) -/** - * A script to convert a trace in text format into a Parquet trace. - */ -fun main(args: Array<String>): Unit = TraceConverterCli().main(args) + count++ + } + + return count + } + + /** + * Options for sampling the workload trace. + */ + private class SamplingOptions : OptionGroup() { + /** + * The fraction of VMs to sample + */ + val fraction by option("--sampling-fraction", help = "fraction of the workload to sample") + .double() + .restrictTo(0.0001, 1.0) + .required() + + /** + * The seed for sampling the trace. + */ + val seed by option("--sampling-seed", help = "seed for sampling the workload") + .long() + .default(0) + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt new file mode 100644 index 00000000..f98f4b2c --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.azure + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension + +/** + * The resource state [Table] for the Azure v1 VM traces. + */ +internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table { + /** + * The partitions that belong to the table. + */ + private val partitions = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + + override val name: String = TABLE_RESOURCE_STATES + + override val isSynthetic: Boolean = false + + override val columns: List<TableColumn<*>> = listOf( + RESOURCE_STATE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_CPU_USAGE_PCT + ) + + override fun newReader(): TableReader { + val it = partitions.iterator() + + return object : TableReader { + var delegate: TableReader? = nextDelegate() + + override fun nextRow(): Boolean { + var delegate = delegate + + while (delegate != null) { + if (delegate.nextRow()) { + break + } + + delegate.close() + delegate = nextDelegate() + } + + this.delegate = delegate + return delegate != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false + + override fun <T> get(column: TableColumn<T>): T { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.get(column) + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getBoolean(column) + } + + override fun getInt(column: TableColumn<Int>): Int { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getInt(column) + } + + override fun getLong(column: TableColumn<Long>): Long { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getLong(column) + } + + override fun getDouble(column: TableColumn<Double>): Double { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getDouble(column) + } + + override fun close() { + delegate?.close() + } + + private fun nextDelegate(): TableReader? { + return if (it.hasNext()) { + val (_, path) = it.next() + return AzureResourceStateTableReader(factory.createParser(path.toFile())) + } else { + null + } + } + + override fun toString(): String = "AzureCompositeTableReader" + } + } + + override fun newReader(partition: String): TableReader { + val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } + return AzureResourceStateTableReader(factory.createParser(path.toFile())) + } + + override fun toString(): String = "AzureResourceStateTable" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt new file mode 100644 index 00000000..f80c0e82 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.azure + +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.opendc.trace.* +import java.time.Instant + +/** + * A [TableReader] for the Azure v1 VM resource state table. + */ +internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader { + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + reset() + + if (!nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue) + "vm id" -> id = parser.text + "avg cpu" -> cpuUsagePct = parser.doubleValue + } + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_CPU_USAGE_PCT -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val res: Any? = when (column) { + RESOURCE_STATE_ID -> id + RESOURCE_STATE_TIMESTAMP -> timestamp + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn<Int>): Int { + throw IllegalArgumentException("Invalid column") + } + + override fun getLong(column: TableColumn<Long>): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn<Double>): Double { + return when (column) { + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + parser.close() + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * State fields of the reader. + */ + private var id: String? = null + private var timestamp: Instant? = null + private var cpuUsagePct = Double.NaN + + /** + * Reset the state. + */ + private fun reset() { + id = null + timestamp = null + cpuUsagePct = Double.NaN + } + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("timestamp", CsvSchema.ColumnType.NUMBER) + .addColumn("vm id", CsvSchema.ColumnType.STRING) + .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .build() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt new file mode 100644 index 00000000..c9d4f7eb --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.azure + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * The resource [Table] for the Azure v1 VM traces. + */ +internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table { + override val name: String = TABLE_RESOURCES + + override val isSynthetic: Boolean = false + + override val columns: List<TableColumn<*>> = listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_NCPUS, + RESOURCE_MEM_CAPACITY + ) + + override fun newReader(): TableReader { + return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("No partition $partition") + } + + override fun toString(): String = "AzureResourceTable" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt new file mode 100644 index 00000000..b712b854 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.azure + +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.apache.parquet.example.Paper.schema +import org.opendc.trace.* +import java.time.Instant + +/** + * A [TableReader] for the Azure v1 VM resources table. + */ +internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader { + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + reset() + + if (!nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "vm id" -> id = parser.text + "vm created" -> startTime = Instant.ofEpochSecond(parser.longValue) + "vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue) + "vm virtual core count" -> cpuCores = parser.intValue + "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB + } + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_ID -> true + RESOURCE_START_TIME -> true + RESOURCE_STOP_TIME -> true + RESOURCE_NCPUS -> true + RESOURCE_MEM_CAPACITY -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val res: Any? = when (column) { + RESOURCE_ID -> id + RESOURCE_START_TIME -> startTime + RESOURCE_STOP_TIME -> stopTime + RESOURCE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) + RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn<Int>): Int { + return when (column) { + RESOURCE_NCPUS -> cpuCores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn<Long>): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn<Double>): Double { + return when (column) { + RESOURCE_MEM_CAPACITY -> memCapacity + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + parser.close() + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * State fields of the reader. + */ + private var id: String? = null + private var startTime: Instant? = null + private var stopTime: Instant? = null + private var cpuCores = -1 + private var memCapacity = Double.NaN + + /** + * Reset the state. + */ + fun reset() { + id = null + startTime = null + stopTime = null + cpuCores = -1 + memCapacity = Double.NaN + } + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("vm id", CsvSchema.ColumnType.NUMBER) + .addColumn("subscription id", CsvSchema.ColumnType.STRING) + .addColumn("deployment id", CsvSchema.ColumnType.NUMBER) + .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER) + .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER) + .addColumn("max cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("vm category", CsvSchema.ColumnType.NUMBER) + .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER) + .addColumn("vm memory", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .build() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt new file mode 100644 index 00000000..24c60bab --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.azure + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * [Trace] implementation for the Azure v1 VM traces. + */ +class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { + override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun containsTable(name: String): Boolean = name in tables + + override fun getTable(name: String): Table? { + return when (name) { + TABLE_RESOURCES -> AzureResourceTable(factory, path) + TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path) + else -> null + } + } + + override fun toString(): String = "AzureTrace[$path]" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt new file mode 100644 index 00000000..744e43a0 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.azure + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A format implementation for the Azure v1 format. + */ +class AzureTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "azure-v1" + + /** + * The [CsvFactory] used to create the parser. + */ + private val factory = CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) + + /** + * Open the trace file. + */ + override fun open(url: URL): AzureTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return AzureTrace(factory, path) + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt index 35bfd5ef..f051bf88 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt @@ -34,16 +34,13 @@ internal class BPResourceStateTable(private val path: Path) : Table { override val name: String = TABLE_RESOURCE_STATES override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_DURATION -> true - RESOURCE_STATE_NCPUS -> true - RESOURCE_STATE_CPU_USAGE -> true - else -> false - } - } + override val columns: List<TableColumn<*>> = listOf( + RESOURCE_STATE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_DURATION, + RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_USAGE, + ) override fun newReader(): TableReader { val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt index 74d1e574..5b0f013f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt @@ -34,16 +34,13 @@ internal class BPResourceTable(private val path: Path) : Table { override val name: String = TABLE_RESOURCES override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_END_TIME -> true - RESOURCE_NCPUS -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } - } + override val columns: List<TableColumn<*>> = listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_NCPUS, + RESOURCE_MEM_CAPACITY + ) override fun newReader(): TableReader { val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet")) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt index 0a105783..4416aae8 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt @@ -45,7 +45,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene return when (column) { RESOURCE_ID -> true RESOURCE_START_TIME -> true - RESOURCE_END_TIME -> true + RESOURCE_STOP_TIME -> true RESOURCE_NCPUS -> true RESOURCE_MEM_CAPACITY -> true else -> false @@ -59,9 +59,9 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene val res: Any = when (column) { RESOURCE_ID -> record["id"].toString() RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long) - RESOURCE_END_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) - RESOURCE_NCPUS -> record["maxCores"] - RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() + RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) + RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS) + RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) else -> throw IllegalArgumentException("Invalid column") } @@ -90,7 +90,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene val record = checkNotNull(record) { "Reader in invalid state" } return when (column) { - RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() + RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt new file mode 100644 index 00000000..7dd8161d --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.bp + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder + +/** + * Schema for the resources table in the trace. + */ +val BP_RESOURCES_SCHEMA: Schema = SchemaBuilder + .record("meta") + .namespace("org.opendc.trace.capelin") + .fields() + .requiredString("id") + .requiredLong("submissionTime") + .requiredLong("endTime") + .requiredInt("maxCores") + .requiredLong("requiredMemory") + .endRecord() + +/** + * Schema for the resource states table in the trace. + */ +val BP_RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder + .record("meta") + .namespace("org.opendc.trace.capelin") + .fields() + .requiredString("id") + .requiredLong("time") + .requiredLong("duration") + .requiredInt("cores") + .requiredDouble("cpuUsage") + .requiredLong("flops") + .endRecord() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt index 24abb109..67140fe9 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt @@ -31,7 +31,7 @@ import kotlin.io.path.extension import kotlin.io.path.nameWithoutExtension /** - * The resource state [Table] in the Bitbrains format. + * The resource state [Table] in the extended Bitbrains format. */ internal class SvResourceStateTable(path: Path) : Table { /** @@ -40,28 +40,26 @@ internal class SvResourceStateTable(path: Path) : Table { private val partitions = Files.walk(path, 1) .filter { !Files.isDirectory(it) && it.extension == "txt" } .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() override val name: String = TABLE_RESOURCE_STATES override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_CLUSTER_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_NCPUS -> true - RESOURCE_STATE_CPU_CAPACITY -> true - RESOURCE_STATE_CPU_USAGE -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - RESOURCE_STATE_CPU_DEMAND -> true - RESOURCE_STATE_CPU_READY_PCT -> true - RESOURCE_STATE_MEM_CAPACITY -> true - RESOURCE_STATE_DISK_READ -> true - RESOURCE_STATE_DISK_WRITE -> true - else -> false - } - } + override val columns: List<TableColumn<*>> = listOf( + RESOURCE_STATE_ID, + RESOURCE_STATE_CLUSTER_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT, + RESOURCE_STATE_CPU_DEMAND, + RESOURCE_STATE_CPU_READY_PCT, + RESOURCE_STATE_MEM_CAPACITY, + RESOURCE_STATE_DISK_READ, + RESOURCE_STATE_DISK_WRITE, + ) override fun newReader(): TableReader { val it = partitions.iterator() @@ -126,7 +124,7 @@ internal class SvResourceStateTable(path: Path) : Table { } } - override fun toString(): String = "BitbrainsCompositeTableReader" + override fun toString(): String = "SvCompositeTableReader" } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt index 1a556f8d..6ea403fe 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt @@ -30,13 +30,8 @@ import java.time.Instant * A [TableReader] for the Bitbrains resource state table. */ internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader { - /** - * The current parser state. - */ - private val state = RowState() - override fun nextRow(): Boolean { - state.reset() + reset() var line: String var num = 0 @@ -70,23 +65,23 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : end = line.indexOf(' ', start) if (end < 0) { - break + end = length } val field = line.subSequence(start, end) as String when (col++) { - COL_TIMESTAMP -> state.timestamp = Instant.ofEpochSecond(field.toLong(10)) - COL_CPU_USAGE -> state.cpuUsage = field.toDouble() - COL_CPU_DEMAND -> state.cpuDemand = field.toDouble() - COL_DISK_READ -> state.diskRead = field.toDouble() - COL_DISK_WRITE -> state.diskWrite = field.toDouble() - COL_CLUSTER_ID -> state.cluster = field.trim() - COL_NCPUS -> state.cpuCores = field.toInt(10) - COL_CPU_READY_PCT -> state.cpuReadyPct = field.toDouble() - COL_POWERED_ON -> state.poweredOn = field.toInt(10) == 1 - COL_CPU_CAPACITY -> state.cpuCapacity = field.toDouble() - COL_ID -> state.id = field.trim() - COL_MEM_CAPACITY -> state.memCapacity = field.toDouble() + COL_TIMESTAMP -> timestamp = Instant.ofEpochSecond(field.toLong(10)) + COL_CPU_USAGE -> cpuUsage = field.toDouble() + COL_CPU_DEMAND -> cpuDemand = field.toDouble() + COL_DISK_READ -> diskRead = field.toDouble() + COL_DISK_WRITE -> diskWrite = field.toDouble() + COL_CLUSTER_ID -> cluster = field.trim() + COL_NCPUS -> cpuCores = field.toInt(10) + COL_CPU_READY_PCT -> cpuReadyPct = field.toDouble() + COL_POWERED_ON -> poweredOn = field.toInt(10) == 1 + COL_CPU_CAPACITY -> cpuCapacity = field.toDouble() + COL_ID -> id = field.trim() + COL_MEM_CAPACITY -> memCapacity = field.toDouble() } } @@ -113,16 +108,16 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : override fun <T> get(column: TableColumn<T>): T { val res: Any? = when (column) { - RESOURCE_STATE_ID -> state.id - RESOURCE_STATE_CLUSTER_ID -> state.cluster - RESOURCE_STATE_TIMESTAMP -> state.timestamp - RESOURCE_STATE_NCPUS -> state.cpuCores - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite + RESOURCE_STATE_ID -> id + RESOURCE_STATE_CLUSTER_ID -> cluster + RESOURCE_STATE_TIMESTAMP -> timestamp + RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) + RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY) + RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) + RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT) + RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) + RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ) + RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE) else -> throw IllegalArgumentException("Invalid column") } @@ -132,14 +127,14 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : override fun getBoolean(column: TableColumn<Boolean>): Boolean { return when (column) { - RESOURCE_STATE_POWERED_ON -> state.poweredOn + RESOURCE_STATE_POWERED_ON -> poweredOn else -> throw IllegalArgumentException("Invalid column") } } override fun getInt(column: TableColumn<Int>): Int { return when (column) { - RESOURCE_STATE_NCPUS -> state.cpuCores + RESOURCE_STATE_NCPUS -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } @@ -150,12 +145,13 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : override fun getDouble(column: TableColumn<Double>): Double { return when (column) { - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite + RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_STATE_CPU_USAGE -> cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity + RESOURCE_STATE_CPU_DEMAND -> cpuDemand + RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_STATE_DISK_READ -> diskRead + RESOURCE_STATE_DISK_WRITE -> diskWrite else -> throw IllegalArgumentException("Invalid column") } } @@ -165,51 +161,37 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : } /** - * The current row state. + * State fields of the reader. */ - private class RowState { - @JvmField - var id: String? = null - @JvmField - var cluster: String? = null - @JvmField - var timestamp: Instant? = null - @JvmField - var cpuCores = -1 - @JvmField - var cpuCapacity = Double.NaN - @JvmField - var cpuUsage = Double.NaN - @JvmField - var cpuDemand = Double.NaN - @JvmField - var cpuReadyPct = Double.NaN - @JvmField - var memCapacity = Double.NaN - @JvmField - var diskRead = Double.NaN - @JvmField - var diskWrite = Double.NaN - @JvmField - var poweredOn: Boolean = false - - /** - * Reset the state. - */ - fun reset() { - id = null - timestamp = null - cluster = null - cpuCores = -1 - cpuCapacity = Double.NaN - cpuUsage = Double.NaN - cpuDemand = Double.NaN - cpuReadyPct = Double.NaN - memCapacity = Double.NaN - diskRead = Double.NaN - diskWrite = Double.NaN - poweredOn = false - } + private var id: String? = null + private var cluster: String? = null + private var timestamp: Instant? = null + private var cpuCores = -1 + private var cpuCapacity = Double.NaN + private var cpuUsage = Double.NaN + private var cpuDemand = Double.NaN + private var cpuReadyPct = Double.NaN + private var memCapacity = Double.NaN + private var diskRead = Double.NaN + private var diskWrite = Double.NaN + private var poweredOn: Boolean = false + + /** + * Reset the state of the reader. + */ + private fun reset() { + id = null + timestamp = null + cluster = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuDemand = Double.NaN + cpuReadyPct = Double.NaN + memCapacity = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + poweredOn = false } /** diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt index 44dec95b..e2e5ea6d 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt @@ -41,7 +41,7 @@ public val RESOURCE_START_TIME: TableColumn<Instant> = TableColumn("resource:sta * End time for the resource. */ @JvmField -public val RESOURCE_END_TIME: TableColumn<Instant> = TableColumn("resource:end_time", Instant::class.java) +public val RESOURCE_STOP_TIME: TableColumn<Instant> = TableColumn("resource:stop_time", Instant::class.java) /** * Number of CPUs for the resource. @@ -50,7 +50,7 @@ public val RESOURCE_END_TIME: TableColumn<Instant> = TableColumn("resource:end_t public val RESOURCE_NCPUS: TableColumn<Int> = intColumn("resource:num_cpus") /** - * Memory capacity for the resource. + * Memory capacity for the resource in KB. */ @JvmField public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource:mem_capacity") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 11e5d6b7..6aca2051 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -37,9 +37,9 @@ public interface Table { public val isSynthetic: Boolean /** - * Determine whether the specified [column] is supported by this table. + * The list of columns supported in this table. */ - public fun isSupported(column: TableColumn<*>): Boolean + public val columns: List<TableColumn<*>> /** * Open a [TableReader] for this table. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt index 88bbc623..46920dce 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt @@ -23,49 +23,52 @@ @file:JvmName("TaskColumns") package org.opendc.trace +import java.time.Duration +import java.time.Instant + /** * A column containing the task identifier. */ @JvmField -public val TASK_ID: TableColumn<Long> = longColumn("task:id") +public val TASK_ID: TableColumn<String> = stringColumn("task:id") /** * A column containing the identifier of the workflow. */ @JvmField -public val TASK_WORKFLOW_ID: TableColumn<Long> = longColumn("task:workflow_id") +public val TASK_WORKFLOW_ID: TableColumn<String> = stringColumn("task:workflow_id") /** * A column containing the submit time of the task. */ @JvmField -public val TASK_SUBMIT_TIME: TableColumn<Long> = longColumn("task:submit_time") +public val TASK_SUBMIT_TIME: TableColumn<Instant> = TableColumn("task:submit_time", type = Instant::class.java) /** * A column containing the wait time of the task. */ @JvmField -public val TASK_WAIT_TIME: TableColumn<Long> = longColumn("task:wait_time") +public val TASK_WAIT_TIME: TableColumn<Instant> = TableColumn("task:wait_time", type = Instant::class.java) /** * A column containing the runtime time of the task. */ @JvmField -public val TASK_RUNTIME: TableColumn<Long> = longColumn("task:runtime") +public val TASK_RUNTIME: TableColumn<Duration> = TableColumn("task:runtime", type = Duration::class.java) /** * A column containing the parents of a task. */ @Suppress("UNCHECKED_CAST") @JvmField -public val TASK_PARENTS: TableColumn<Set<Long>> = TableColumn("task:parents", type = Set::class.java as Class<Set<Long>>) +public val TASK_PARENTS: TableColumn<Set<String>> = TableColumn("task:parents", type = Set::class.java as Class<Set<String>>) /** * A column containing the children of a task. */ @Suppress("UNCHECKED_CAST") @JvmField -public val TASK_CHILDREN: TableColumn<Set<Long>> = TableColumn("task:children", type = Set::class.java as Class<Set<Long>>) +public val TASK_CHILDREN: TableColumn<Set<String>> = TableColumn("task:children", type = Set::class.java as Class<Set<String>>) /** * A column containing the requested CPUs of a task. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt index 36e93b52..0ae45e86 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt @@ -22,6 +22,11 @@ package org.opendc.trace +import org.opendc.trace.spi.TraceFormat +import java.io.File +import java.net.URL +import java.nio.file.Path + /** * A trace is a collection of related tables that characterize a workload. */ @@ -40,4 +45,34 @@ public interface Trace { * Obtain a [Table] with the specified [name]. */ public fun getTable(name: String): Table? + + public companion object { + /** + * Open a [Trace] at the specified [url] in the given [format]. + * + * @throws IllegalArgumentException if [format] is not supported. + */ + public fun open(url: URL, format: String): Trace { + val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } + return provider.open(url) + } + + /** + * Open a [Trace] at the specified [path] in the given [format]. + * + * @throws IllegalArgumentException if [format] is not supported. + */ + public fun open(path: File, format: String): Trace { + return open(path.toURI().toURL(), format) + } + + /** + * Open a [Trace] at the specified [path] in the given [format]. + * + * @throws IllegalArgumentException if [format] is not supported. + */ + public fun open(path: Path, format: String): Trace { + return open(path.toUri().toURL(), format) + } + } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt index 767ef919..c9e5954d 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -33,7 +33,7 @@ import kotlin.io.path.nameWithoutExtension /** * The resource state [Table] in the Bitbrains format. */ -internal class BitbrainsResourceStateTable(private val factory: CsvFactory, private val path: Path) : Table { +internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path: Path) : Table { /** * The partitions that belong to the table. */ @@ -41,28 +41,26 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, priv Files.walk(path, 1) .filter { !Files.isDirectory(it) && it.extension == "csv" } .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() override val name: String = TABLE_RESOURCE_STATES override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_NCPUS -> true - RESOURCE_STATE_CPU_CAPACITY -> true - RESOURCE_STATE_CPU_USAGE -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - RESOURCE_STATE_MEM_CAPACITY -> true - RESOURCE_STATE_MEM_USAGE -> true - RESOURCE_STATE_DISK_READ -> true - RESOURCE_STATE_DISK_WRITE -> true - RESOURCE_STATE_NET_RX -> true - RESOURCE_STATE_NET_TX -> true - else -> false - } - } + override val columns: List<TableColumn<*>> = listOf( + RESOURCE_STATE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT, + RESOURCE_STATE_MEM_CAPACITY, + RESOURCE_STATE_MEM_USAGE, + RESOURCE_STATE_DISK_READ, + RESOURCE_STATE_DISK_WRITE, + RESOURCE_STATE_NET_RX, + RESOURCE_STATE_NET_TX, + ) override fun newReader(): TableReader { val it = partitions.iterator() diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt index 5687ac7f..dab784c2 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -22,20 +22,42 @@ package org.opendc.trace.bitbrains +import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import java.text.NumberFormat import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.time.format.DateTimeFormatter +import java.time.format.DateTimeParseException +import java.util.* /** * A [TableReader] for the Bitbrains resource state table. */ internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader { /** - * The current parser state. + * The [DateTimeFormatter] used to parse the timestamps in case of the Materna trace. */ - private val state = RowState() + private val formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss") + + /** + * The type of timestamps in the trace. + */ + private var timestampType: TimestampType = TimestampType.UNDECIDED + + /** + * The [NumberFormat] used to parse doubles containing a comma. + */ + private val nf = NumberFormat.getInstance(Locale.GERMAN) + + /** + * A flag to indicate that the trace contains decimals with a comma separator. + */ + private var usesCommaDecimalSeparator = false init { parser.schema = schema @@ -43,7 +65,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun nextRow(): Boolean { // Reset the row state - state.reset() + reset() if (!nextStart()) { return false @@ -57,17 +79,32 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, } when (parser.currentName) { - "Timestamp [ms]" -> state.timestamp = Instant.ofEpochSecond(parser.longValue) - "CPU cores" -> state.cpuCores = parser.intValue - "CPU capacity provisioned [MHZ]" -> state.cpuCapacity = parser.doubleValue - "CPU usage [MHZ]" -> state.cpuUsage = parser.doubleValue - "CPU usage [%]" -> state.cpuUsagePct = parser.doubleValue - "Memory capacity provisioned [KB]" -> state.memCapacity = parser.doubleValue - "Memory usage [KB]" -> state.memUsage = parser.doubleValue - "Disk read throughput [KB/s]" -> state.diskRead = parser.doubleValue - "Disk write throughput [KB/s]" -> state.diskWrite = parser.doubleValue - "Network received throughput [KB/s]" -> state.netReceived = parser.doubleValue - "Network transmitted throughput [KB/s]" -> state.netTransmitted = parser.doubleValue + "Timestamp [ms]" -> { + timestamp = when (timestampType) { + TimestampType.UNDECIDED -> { + try { + val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) + timestampType = TimestampType.DATE_TIME + res + } catch (e: DateTimeParseException) { + timestampType = TimestampType.EPOCH_MILLIS + Instant.ofEpochSecond(parser.longValue) + } + } + TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) + TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue) + } + } + "CPU cores" -> cpuCores = parser.intValue + "CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble() + "CPU usage [MHZ]" -> cpuUsage = parseSafeDouble() + "CPU usage [%]" -> cpuUsagePct = parseSafeDouble() / 100.0 // Convert to range [0, 1] + "Memory capacity provisioned [KB]" -> memCapacity = parseSafeDouble() + "Memory usage [KB]" -> memUsage = parseSafeDouble() + "Disk read throughput [KB/s]" -> diskRead = parseSafeDouble() + "Disk write throughput [KB/s]" -> diskWrite = parseSafeDouble() + "Network received throughput [KB/s]" -> netReceived = parseSafeDouble() + "Network transmitted throughput [KB/s]" -> netTransmitted = parseSafeDouble() } } @@ -95,17 +132,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun <T> get(column: TableColumn<T>): T { val res: Any? = when (column) { RESOURCE_STATE_ID -> partition - RESOURCE_STATE_TIMESTAMP -> state.timestamp - RESOURCE_STATE_NCPUS -> state.cpuCores - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_MEM_USAGE -> state.memUsage - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite - RESOURCE_STATE_NET_RX -> state.netReceived - RESOURCE_STATE_NET_TX -> state.netTransmitted + RESOURCE_STATE_TIMESTAMP -> timestamp + RESOURCE_STATE_NCPUS -> cpuCores + RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_STATE_CPU_USAGE -> cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_STATE_MEM_USAGE -> memUsage + RESOURCE_STATE_DISK_READ -> diskRead + RESOURCE_STATE_DISK_WRITE -> diskWrite + RESOURCE_STATE_NET_RX -> netReceived + RESOURCE_STATE_NET_TX -> netTransmitted else -> throw IllegalArgumentException("Invalid column") } @@ -119,7 +156,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getInt(column: TableColumn<Int>): Int { return when (column) { - RESOURCE_STATE_NCPUS -> state.cpuCores + RESOURCE_STATE_NCPUS -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } @@ -130,15 +167,15 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getDouble(column: TableColumn<Double>): Double { return when (column) { - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_MEM_USAGE -> state.memUsage - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite - RESOURCE_STATE_NET_RX -> state.netReceived - RESOURCE_STATE_NET_TX -> state.netTransmitted + RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_STATE_CPU_USAGE -> cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_STATE_MEM_USAGE -> memUsage + RESOURCE_STATE_DISK_READ -> diskRead + RESOURCE_STATE_DISK_WRITE -> diskWrite + RESOURCE_STATE_NET_RX -> netReceived + RESOURCE_STATE_NET_TX -> netTransmitted else -> throw IllegalArgumentException("Invalid column") } } @@ -161,37 +198,62 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, } /** - * The current row state. + * Try to parse the current value safely as double. */ - private class RowState { - var timestamp: Instant? = null - var cpuCores = -1 - var cpuCapacity = Double.NaN - var cpuUsage = Double.NaN - var cpuUsagePct = Double.NaN - var memCapacity = Double.NaN - var memUsage = Double.NaN - var diskRead = Double.NaN - var diskWrite = Double.NaN - var netReceived = Double.NaN - var netTransmitted = Double.NaN + private fun parseSafeDouble(): Double { + if (!usesCommaDecimalSeparator) { + try { + return parser.doubleValue + } catch (e: JsonParseException) { + usesCommaDecimalSeparator = true + } + } - /** - * Reset the state. - */ - fun reset() { - timestamp = null - cpuCores = -1 - cpuCapacity = Double.NaN - cpuUsage = Double.NaN - cpuUsagePct = Double.NaN - memCapacity = Double.NaN - memUsage = Double.NaN - diskRead = Double.NaN - diskWrite = Double.NaN - netReceived = Double.NaN - netTransmitted = Double.NaN + val text = parser.text + if (text.isBlank()) { + return 0.0 } + + return nf.parse(text).toDouble() + } + + /** + * State fields of the reader. + */ + private var timestamp: Instant? = null + private var cpuCores = -1 + private var cpuCapacity = Double.NaN + private var cpuUsage = Double.NaN + private var cpuUsagePct = Double.NaN + private var memCapacity = Double.NaN + private var memUsage = Double.NaN + private var diskRead = Double.NaN + private var diskWrite = Double.NaN + private var netReceived = Double.NaN + private var netTransmitted = Double.NaN + + /** + * Reset the state. + */ + private fun reset() { + timestamp = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuUsagePct = Double.NaN + memCapacity = Double.NaN + memUsage = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + netReceived = Double.NaN + netTransmitted = Double.NaN + } + + /** + * The type of the timestamp in the trace. + */ + private enum class TimestampType { + UNDECIDED, DATE_TIME, EPOCH_MILLIS } companion object { @@ -199,15 +261,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, * The [CsvSchema] that is used to parse the trace. */ private val schema = CsvSchema.builder() - .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) + .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING) .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER) .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER) .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) .setAllowComments(true) diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt new file mode 100644 index 00000000..bc4f0b7d --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension + +/** + * The resources [Table] in the Bitbrains format. + */ +internal class BitbrainsResourceTable(private val factory: CsvFactory, path: Path) : Table { + /** + * The VMs that belong to the table. + */ + private val vms = + Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + + override val name: String = TABLE_RESOURCES + + override val isSynthetic: Boolean = true + + override val columns: List<TableColumn<*>> = listOf(RESOURCE_ID) + + override fun newReader(): TableReader { + return BitbrainsResourceTableReader(factory, vms) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Unknown partition $partition") + } + + override fun toString(): String = "BitbrainsResourceTable" +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt new file mode 100644 index 00000000..c02dc5ae --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * A [TableReader] for the Bitbrains resource table. + */ +internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms: Map<String, Path>) : TableReader { + /** + * An iterator to iterate over the resource entries. + */ + private val it = vms.iterator() + + override fun nextRow(): Boolean { + reset() + + while (it.hasNext()) { + val (name, path) = it.next() + + val parser = factory.createParser(path.toFile()) + val reader = BitbrainsResourceStateTableReader(name, parser) + + try { + if (!reader.nextRow()) { + continue + } + + id = reader.get(RESOURCE_STATE_ID) + return true + } finally { + reader.close() + } + } + + return false + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_ID -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val res: Any? = when (column) { + RESOURCE_ID -> id + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn<Int>): Int { + throw IllegalArgumentException("Invalid column") + } + + override fun getLong(column: TableColumn<Long>): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn<Double>): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun close() {} + + /** + * State fields of the reader. + */ + private var id: String? = null + + /** + * Reset the state of the reader. + */ + private fun reset() { + id = null + } +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt index 5a2d4243..bcd8dd52 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt @@ -30,16 +30,16 @@ import java.nio.file.Path * [Trace] implementation for the Bitbrains format. */ public class BitbrainsTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCE_STATES) + override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name + override fun containsTable(name: String): Boolean = tables.contains(name) override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null + return when (name) { + TABLE_RESOURCES -> BitbrainsResourceTable(factory, path) + TABLE_RESOURCE_STATES -> BitbrainsResourceStateTable(factory, path) + else -> null } - - return BitbrainsResourceStateTable(factory, path) } override fun toString(): String = "BitbrainsTrace[$path]" diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt index 550805d3..ff4a33f8 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -25,9 +25,7 @@ package org.opendc.trace.bitbrains import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.trace.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.RESOURCE_STATE_TIMESTAMP -import org.opendc.trace.TABLE_RESOURCE_STATES +import org.opendc.trace.* import java.net.URL /** @@ -58,7 +56,7 @@ class BitbrainsTraceFormatTest { val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) val trace = format.open(url) - assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) } @Test @@ -82,6 +80,23 @@ class BitbrainsTraceFormatTest { } @Test + fun testResources() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + val trace = format.open(url) + + val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("bitbrains", reader.get(RESOURCE_ID)) }, + { assertFalse(reader.nextRow()) } + ) + + reader.close() + } + + @Test fun testSmoke() { val format = BitbrainsTraceFormat() val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt index 80a99d10..fd7bd068 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt @@ -34,18 +34,15 @@ internal class GwfTaskTable(private val factory: CsvFactory, private val url: UR override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_WORKFLOW_ID -> true - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - else -> false - } - } + override val columns: List<TableColumn<*>> = listOf( + TASK_WORKFLOW_ID, + TASK_ID, + TASK_SUBMIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS + ) override fun newReader(): TableReader { return GwfTaskTableReader(factory.createParser(url)) diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 64b7d465..39eb5520 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -26,24 +26,21 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import java.time.Duration +import java.time.Instant import java.util.regex.Pattern /** * A [TableReader] implementation for the GWF format. */ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { - /** - * The current parser state. - */ - private val state = RowState() - init { parser.schema = schema } override fun nextRow(): Boolean { // Reset the row state - state.reset() + reset() if (!nextStart()) { return false @@ -57,12 +54,12 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } when (parser.currentName) { - "WorkflowID" -> state.workflowId = parser.longValue - "JobID" -> state.jobId = parser.longValue - "SubmitTime" -> state.submitTime = parser.longValue - "RunTime" -> state.runtime = parser.longValue - "NProcs" -> state.nProcs = parser.intValue - "ReqNProcs" -> state.reqNProcs = parser.intValue + "WorkflowID" -> workflowId = parser.text + "JobID" -> jobId = parser.text + "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue) + "RunTime" -> runtime = Duration.ofSeconds(parser.longValue) + "NProcs" -> nProcs = parser.intValue + "ReqNProcs" -> reqNProcs = parser.intValue "Dependencies" -> parseParents(parser.valueAsString) } } @@ -84,14 +81,14 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun <T> get(column: TableColumn<T>): T { - val res: Any = when (column) { - TASK_WORKFLOW_ID -> state.workflowId - TASK_ID -> state.jobId - TASK_SUBMIT_TIME -> state.submitTime - TASK_RUNTIME -> state.runtime - TASK_REQ_NCPUS -> state.nProcs - TASK_ALLOC_NCPUS -> state.reqNProcs - TASK_PARENTS -> state.dependencies + val res: Any? = when (column) { + TASK_WORKFLOW_ID -> workflowId + TASK_ID -> jobId + TASK_SUBMIT_TIME -> submitTime + TASK_RUNTIME -> runtime + TASK_REQ_NCPUS -> nProcs + TASK_ALLOC_NCPUS -> reqNProcs + TASK_PARENTS -> dependencies else -> throw IllegalArgumentException("Invalid column") } @@ -105,20 +102,14 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun getInt(column: TableColumn<Int>): Int { return when (column) { - TASK_REQ_NCPUS -> state.nProcs - TASK_ALLOC_NCPUS -> state.reqNProcs + TASK_REQ_NCPUS -> nProcs + TASK_ALLOC_NCPUS -> reqNProcs else -> throw IllegalArgumentException("Invalid column") } } override fun getLong(column: TableColumn<Long>): Long { - return when (column) { - TASK_WORKFLOW_ID -> state.workflowId - TASK_ID -> state.jobId - TASK_SUBMIT_TIME -> state.submitTime - TASK_RUNTIME -> state.runtime - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn<Double>): Double { @@ -166,29 +157,27 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } /** - * The current row state. + * Reader state fields. */ - private class RowState { - var workflowId = -1L - var jobId = -1L - var submitTime = -1L - var runtime = -1L - var nProcs = -1 - var reqNProcs = -1 - var dependencies = emptySet<Long>() + private var workflowId: String? = null + private var jobId: String? = null + private var submitTime: Instant? = null + private var runtime: Duration? = null + private var nProcs = -1 + private var reqNProcs = -1 + private var dependencies = emptySet<Long>() - /** - * Reset the state. - */ - fun reset() { - workflowId = -1 - jobId = -1 - submitTime = -1 - runtime = -1 - nProcs = -1 - reqNProcs = -1 - dependencies = emptySet() - } + /** + * Reset the state. + */ + private fun reset() { + workflowId = null + jobId = null + submitTime = null + runtime = null + nProcs = -1 + reqNProcs = -1 + dependencies = emptySet() } companion object { diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index 6b0568fe..b209b979 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -29,6 +29,8 @@ import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.trace.* import java.net.URL +import java.time.Duration +import java.time.Instant /** * Test suite for the [GwfTraceFormat] class. @@ -90,11 +92,11 @@ internal class GwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(0L, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(1L, reader.getLong(TASK_ID)) }, - { assertEquals(16, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(11, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf<Long>(), reader.get(TASK_PARENTS)) }, + { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals("1", reader.get(TASK_ID)) }, + { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, + { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) }, ) } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt index 12a51a2f..7ec0d607 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt @@ -34,21 +34,18 @@ internal class SwfTaskTable(private val path: Path) : Table { override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_WAIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - TASK_STATUS -> true - TASK_GROUP_ID -> true - TASK_USER_ID -> true - else -> false - } - } + override val columns: List<TableColumn<*>> = listOf( + TASK_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + TASK_STATUS, + TASK_GROUP_ID, + TASK_USER_ID + ) override fun newReader(): TableReader { val reader = path.bufferedReader() diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt index 5f879a54..3f49c770 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt @@ -24,6 +24,8 @@ package org.opendc.trace.swf import org.opendc.trace.* import java.io.BufferedReader +import java.time.Duration +import java.time.Instant /** * A [TableReader] implementation for the SWF format. @@ -85,10 +87,10 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea override fun <T> get(column: TableColumn<T>): T { val res: Any = when (column) { - TASK_ID -> getLong(TASK_ID) - TASK_SUBMIT_TIME -> getLong(TASK_SUBMIT_TIME) - TASK_WAIT_TIME -> getLong(TASK_WAIT_TIME) - TASK_RUNTIME -> getLong(TASK_RUNTIME) + TASK_ID -> fields[COL_JOB_ID] + TASK_SUBMIT_TIME -> Instant.ofEpochSecond(fields[COL_SUBMIT_TIME].toLong(10)) + TASK_WAIT_TIME -> Duration.ofSeconds(fields[COL_WAIT_TIME].toLong(10)) + TASK_RUNTIME -> Duration.ofSeconds(fields[COL_RUN_TIME].toLong(10)) TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS) TASK_PARENTS -> { @@ -121,13 +123,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea } override fun getLong(column: TableColumn<Long>): Long { - return when (column) { - TASK_ID -> fields[COL_JOB_ID].toLong(10) - TASK_SUBMIT_TIME -> fields[COL_SUBMIT_TIME].toLong(10) - TASK_WAIT_TIME -> fields[COL_WAIT_TIME].toLong(10) - TASK_RUNTIME -> fields[COL_RUN_TIME].toLong(10) - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn<Double>): Double { diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 9686891b..828c2bfa 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -85,10 +85,10 @@ internal class SwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(1, reader.getLong(TASK_ID)) }, + { assertEquals("1", reader.get(TASK_ID)) }, { assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) }, { assertTrue(reader.nextRow()) }, - { assertEquals(2, reader.getLong(TASK_ID)) }, + { assertEquals("2", reader.get(TASK_ID)) }, { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) }, ) diff --git a/opendc-trace/opendc-trace-wfformat/build.gradle.kts b/opendc-trace/opendc-trace-wfformat/build.gradle.kts new file mode 100644 index 00000000..2d336d03 --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support for WfCommons workload traces in OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcTrace.opendcTraceApi) + + implementation(libs.jackson.core) +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt new file mode 100644 index 00000000..7b7f979f --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * A [Table] containing the tasks in a WfCommons workload trace. + */ +internal class WfFormatTaskTable(private val factory: JsonFactory, private val path: Path) : Table { + override val name: String = TABLE_TASKS + + override val isSynthetic: Boolean = false + + override val columns: List<TableColumn<*>> = listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN + ) + + override fun newReader(): TableReader { + val parser = factory.createParser(path.toFile()) + return WfFormatTaskTableReader(parser) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Invalid partition $partition") + } + + override fun toString(): String = "WfFormatTaskTable" +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt new file mode 100644 index 00000000..4408ba5c --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt @@ -0,0 +1,234 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.core.JsonToken +import org.opendc.trace.* +import java.time.Duration +import kotlin.math.roundToInt + +/** + * A [TableReader] implementation for the WfCommons workload trace format. + */ +internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableReader { + /** + * The current nesting of the parser. + */ + private var level: ParserLevel = ParserLevel.TOP + + override fun nextRow(): Boolean { + reset() + + var hasJob = false + + while (!hasJob) { + when (level) { + ParserLevel.TOP -> { + val token = parser.nextToken() + + // Check whether the document is not empty and starts with an object + if (token == null) { + break + } else if (token != JsonToken.START_OBJECT) { + throw JsonParseException(parser, "Expected object", parser.currentLocation) + } else { + level = ParserLevel.TRACE + } + } + ParserLevel.TRACE -> { + // Seek for the workflow object in the file + if (!seekWorkflow()) { + break + } else if (!parser.isExpectedStartObjectToken) { + throw JsonParseException(parser, "Expected object", parser.currentLocation) + } else { + level = ParserLevel.WORKFLOW + } + } + ParserLevel.WORKFLOW -> { + // Seek for the jobs object in the file + level = if (!seekJobs()) { + ParserLevel.TRACE + } else if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array", parser.currentLocation) + } else { + ParserLevel.JOB + } + } + ParserLevel.JOB -> { + when (parser.nextToken()) { + JsonToken.END_ARRAY -> level = ParserLevel.WORKFLOW + JsonToken.START_OBJECT -> { + parseJob() + hasJob = true + break + } + else -> throw JsonParseException(parser, "Unexpected token", parser.currentLocation) + } + } + } + } + + return hasJob + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + TASK_ID -> true + TASK_WORKFLOW_ID -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_PARENTS -> true + TASK_CHILDREN -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val res: Any? = when (column) { + TASK_ID -> id + TASK_WORKFLOW_ID -> workflowId + TASK_RUNTIME -> runtime + TASK_PARENTS -> parents + TASK_CHILDREN -> children + TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn<Int>): Int { + return when (column) { + TASK_REQ_NCPUS -> cores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn<Long>): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn<Double>): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + parser.close() + } + + /** + * Parse the trace and seek until the workflow description. + */ + private fun seekWorkflow(): Boolean { + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "name" -> workflowId = parser.text + "workflow" -> return true + else -> parser.skipChildren() + } + } + + return false + } + + /** + * Parse the workflow description in the file and seek until the first job. + */ + private fun seekJobs(): Boolean { + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "jobs" -> return true + else -> parser.skipChildren() + } + } + + return false + } + + /** + * Parse a single job in the file. + */ + private fun parseJob() { + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "name" -> id = parser.text + "parents" -> parents = parseIds() + "children" -> children = parseIds() + "runtime" -> runtime = Duration.ofSeconds(parser.numberValue.toLong()) + "cores" -> cores = parser.floatValue.roundToInt() + else -> parser.skipChildren() + } + } + } + + /** + * Parse the parents/children of a job. + */ + private fun parseIds(): Set<String> { + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array", parser.currentLocation) + } + + val ids = mutableSetOf<String>() + + while (parser.nextToken() != JsonToken.END_ARRAY) { + if (parser.currentToken != JsonToken.VALUE_STRING) { + throw JsonParseException(parser, "Expected token", parser.currentLocation) + } + + ids.add(parser.valueAsString) + } + + return ids + } + + private enum class ParserLevel { + TOP, TRACE, WORKFLOW, JOB + } + + /** + * State fields for the parser. + */ + private var id: String? = null + private var workflowId: String? = null + private var runtime: Duration? = null + private var parents: Set<String>? = null + private var children: Set<String>? = null + private var cores = -1 + + private fun reset() { + id = null + runtime = null + parents = null + children = null + cores = -1 + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt new file mode 100644 index 00000000..2d9c79fb --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.TABLE_TASKS +import org.opendc.trace.Table +import org.opendc.trace.Trace +import java.nio.file.Path + +/** + * [Trace] implementation for the WfCommons workload trace format. + */ +public class WfFormatTrace internal constructor(private val factory: JsonFactory, private val path: Path) : Trace { + override val tables: List<String> = listOf(TABLE_TASKS) + + override fun containsTable(name: String): Boolean = TABLE_TASKS == name + + override fun getTable(name: String): Table? { + return when (name) { + TABLE_TASKS -> WfFormatTaskTable(factory, path) + else -> null + } + } + + override fun toString(): String = "WfFormatTrace[$path]" +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt new file mode 100644 index 00000000..ff8d054c --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A [TraceFormat] implementation for the WfCommons workload trace format. + */ +public class WfFormatTraceFormat : TraceFormat { + /** + * The [JsonFactory] that is used to created JSON parsers. + */ + private val factory = JsonFactory() + + override val name: String = "wfformat" + + override fun open(url: URL): WfFormatTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return WfFormatTrace(factory, path) + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat new file mode 100644 index 00000000..ee3aa2f6 --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -0,0 +1 @@ +org.opendc.trace.wfformat.WfFormatTraceFormat diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt new file mode 100644 index 00000000..b07f27ed --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt @@ -0,0 +1,345 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import com.fasterxml.jackson.core.JsonParseException +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.TASK_ID +import org.opendc.trace.TASK_PARENTS + +/** + * Test suite for the [WfFormatTaskTableReader] class. + */ +internal class WfFormatTaskTableReaderTest { + /** + * The [JsonFactory] used to construct the parser. + */ + private val factory = JsonFactory() + + @Test + fun testEmptyInput() { + val content = "" + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertFalse(reader.nextRow()) + reader.close() + } + + @Test + fun testTopLevelArrayInput() { + val content = "[]" + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testNoWorkflow() { + val content = """ + { + "name": "eager-nextflow-chameleon" + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertDoesNotThrow { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testWorkflowArrayType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": [] + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testWorkflowNullType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": null + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testNoJobs() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertDoesNotThrow { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsObjectType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { "jobs": {} } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsNullType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { "jobs": null } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsInvalidChildType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [1] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsValidChildType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test" + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertTrue(reader.nextRow()) + assertEquals("test", reader.get(TASK_ID)) + assertFalse(reader.nextRow()) + + reader.close() + } + + @Test + fun testJobsInvalidParents() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": 1, + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsInvalidParentsItem() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": [1], + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsValidParents() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertTrue(reader.nextRow()) + assertEquals(setOf("1"), reader.get(TASK_PARENTS)) + assertFalse(reader.nextRow()) + + reader.close() + } + + @Test + fun testJobsInvalidSecondEntry() { + val content = """ + { + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + }, + "test" + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertDoesNotThrow { reader.nextRow() } + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testDuplicateJobsArray() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + } + ], + "jobs": [ + { + "name": "test2", + "parents": ["test"] + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertTrue(reader.nextRow()) + assertTrue(reader.nextRow()) + assertEquals("test2", reader.get(TASK_ID)) + assertFalse(reader.nextRow()) + + reader.close() + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt new file mode 100644 index 00000000..0bfc8840 --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.* +import java.io.File +import java.net.URL + +/** + * Test suite for the [WfFormatTraceFormat] class. + */ +class WfFormatTraceFormatTest { + @Test + fun testTraceExists() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + assertDoesNotThrow { format.open(input) } + } + + @Test + fun testTraceDoesNotExists() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + assertThrows<IllegalArgumentException> { format.open(URL(input.toString() + "help")) } + } + + @Test + fun testTables() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val trace = format.open(input) + + assertEquals(listOf(TABLE_TASKS), trace.tables) + } + + @Test + fun testTableExists() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS) + + assertNotNull(table) + assertDoesNotThrow { table!!.newReader() } + } + + @Test + fun testTableDoesNotExist() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val trace = format.open(input) + + assertFalse(trace.containsTable("test")) + assertNull(trace.getTable("test")) + } + + /** + * Smoke test for parsing WfCommons traces. + */ + @Test + fun testTableReader() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val trace = WfFormatTraceFormat().open(input) + val reader = trace.getTable(TABLE_TASKS)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.get(TASK_ID)) }, + { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(172000, reader.get(TASK_RUNTIME).toMillis()) }, + { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) }, + ) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.get(TASK_ID)) }, + { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(175000, reader.get(TASK_RUNTIME).toMillis()) }, + { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.get(TASK_PARENTS)) }, + ) + + reader.close() + } + + /** + * Test full iteration of the table. + */ + @Test + fun testTableReaderFull() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val trace = WfFormatTraceFormat().open(input) + val reader = trace.getTable(TABLE_TASKS)!!.newReader() + + assertDoesNotThrow { + while (reader.nextRow()) { + // reader.get(TASK_ID) + } + reader.close() + } + } + + @Test + fun testTableReaderPartition() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS)!! + + assertThrows<IllegalArgumentException> { table.newReader("test") } + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json b/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json new file mode 100644 index 00000000..d21f024d --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json @@ -0,0 +1,1342 @@ +{ + "name": "eager-nextflow-chameleon", + "description": "Instance generated with WfCommons - https://wfcommons.org", + "createdAt": "2021-09-06T03:43:31.762479", + "schemaVersion": "1.2", + "author": { + "name": "cc", + "email": "support@wfcommons.org" + }, + "wms": { + "name": "Nextflow", + "version": "21.04.3", + "url": "https://www.nextflow.io" + }, + "workflow": { + "executedAt": "20210906T034331+0000", + "makespan": 275, + "jobs": [ + { + "name": "makebwaindex_mammoth_mt_krause.fasta", + "type": "compute", + "runtime": 172.182, + "command": { + "program": "makebwaindex", + "arguments": [ + "bwa", + "index", + "Mammoth_MT_Krause.fasta", + "mkdir", + "BWAIndex", + "&&", + "mv", + "Mammoth_MT_Krause.fasta*", + "BWAIndex" + ] + }, + "parents": [], + "children": [ + "makeseqdict_mammoth_mt_krause.fasta" + ], + "files": [], + "cores": 1.0, + "id": "ID000001", + "category": "makebwaindex", + "avgCPU": 5.8, + "bytesRead": 124, + "bytesWritten": 126, + "memory": 4248 + }, + { + "name": "makeseqdict_mammoth_mt_krause.fasta", + "type": "compute", + "runtime": 175.427, + "command": { + "program": "makeseqdict", + "arguments": [ + "picard", + "-Xmx6144M", + "CreateSequenceDictionary", + "R=Mammoth_MT_Krause.fasta", + "O=\"Mammoth_MT_Krause.dict\"" + ] + }, + "parents": [ + "makebwaindex_mammoth_mt_krause.fasta" + ], + "children": [ + "makefastaindex_mammoth_mt_krause.fasta" + ], + "files": [], + "cores": 1.0, + "id": "ID000003", + "category": "makeseqdict", + "avgCPU": 83.5, + "bytesRead": 22728, + "bytesWritten": 1300, + "memory": 104416 + }, + { + "name": "makefastaindex_mammoth_mt_krause.fasta", + "type": "compute", + "runtime": 170.797, + "command": { + "program": "makefastaindex", + "arguments": [ + "samtools", + "faidx", + "Mammoth_MT_Krause.fasta" + ] + }, + "parents": [ + "makeseqdict_mammoth_mt_krause.fasta" + ], + "children": [ + "output_documentation" + ], + "files": [], + "cores": 1.0, + "id": "ID000002", + "category": "makefastaindex", + "avgCPU": 23.8, + "bytesRead": 66, + "bytesWritten": 4, + "memory": 6096 + }, + { + "name": "output_documentation", + "type": "compute", + "runtime": 173.479, + "command": { + "program": "output_documentation", + "arguments": [ + "markdown_to_html.py", + "output.md", + "-o", + "results_description.html" + ] + }, + "parents": [ + "makefastaindex_mammoth_mt_krause.fasta" + ], + "children": [ + "get_software_versions" + ], + "files": [], + "cores": 1.0, + "id": "ID000005", + "category": "output_documentation", + "avgCPU": 84.0, + "bytesRead": 8222, + "bytesWritten": 15165, + "memory": 11488 + }, + { + "name": "get_software_versions", + "type": "compute", + "runtime": 183.445, + "command": { + "program": "get_software_versions", + "arguments": [ + "echo", + "2.3.5", + "&>", + "v_pipeline.txt", + "echo", + "21.04.3", + "&>", + "v_nextflow.txt", + "fastqc", + "--version", + "&>", + "v_fastqc.txt", + "2>&1", + "||", + "true", + "AdapterRemoval", + "--version", + "&>", + "v_adapterremoval.txt", + "2>&1", + "||", + "true", + "fastp", + "--version", + "&>", + "v_fastp.txt", + "2>&1", + "||", + "true", + "bwa", + "&>", + "v_bwa.txt", + "2>&1", + "||", + "true", + "circulargenerator", + "--help", + "|", + "head", + "-n", + "1", + "&>", + "v_circulargenerator.txt", + "2>&1", + "||", + "true", + "samtools", + "--version", + "&>", + "v_samtools.txt", + "2>&1", + "||", + "true", + "dedup", + "-v", + "&>", + "v_dedup.txt", + "2>&1", + "||", + "true", + "##", + "bioconda", + "recipe", + "of", + "picard", + "is", + "incorrectly", + "set", + "up", + "and", + "extra", + "warning", + "made", + "with", + "stderr,", + "this", + "ugly", + "command", + "ensures", + "only", + "version", + "exported", + "(", + "exec", + "7>&1", + "picard", + "MarkDuplicates", + "--version", + "2>&1", + ">&7", + "|", + "grep", + "-v", + "/", + ">&2", + ")", + "2>", + "v_markduplicates.txt", + "||", + "true", + "qualimap", + "--version", + "&>", + "v_qualimap.txt", + "2>&1", + "||", + "true", + "preseq", + "&>", + "v_preseq.txt", + "2>&1", + "||", + "true", + "gatk", + "--version", + "2>&1", + "|", + "head", + "-n", + "1", + ">", + "v_gatk.txt", + "2>&1", + "||", + "true", + "gatk3", + "--version", + "2>&1", + ">", + "v_gatk3.txt", + "2>&1", + "||", + "true", + "freebayes", + "--version", + "&>", + "v_freebayes.txt", + "2>&1", + "||", + "true", + "bedtools", + "--version", + "&>", + "v_bedtools.txt", + "2>&1", + "||", + "true", + "damageprofiler", + "--version", + "&>", + "v_damageprofiler.txt", + "2>&1", + "||", + "true", + "bam", + "--version", + "&>", + "v_bamutil.txt", + "2>&1", + "||", + "true", + "pmdtools", + "--version", + "&>", + "v_pmdtools.txt", + "2>&1", + "||", + "true", + "angsd", + "-h", + "|&", + "head", + "-n", + "1", + "|", + "cut", + "-d", + "-f3-4", + "&>", + "v_angsd.txt", + "2>&1", + "||", + "true", + "multivcfanalyzer", + "--help", + "|", + "head", + "-n", + "1", + "&>", + "v_multivcfanalyzer.txt", + "2>&1", + "||", + "true", + "malt-run", + "--help", + "|&", + "tail", + "-n", + "3", + "|", + "head", + "-n", + "1", + "|", + "cut", + "-f", + "2", + "-d(", + "|", + "cut", + "-f", + "1", + "-d", + ",", + "&>", + "v_malt.txt", + "2>&1", + "||", + "true", + "MaltExtract", + "--help", + "|", + "head", + "-n", + "2", + "|", + "tail", + "-n", + "1", + "&>", + "v_maltextract.txt", + "2>&1", + "||", + "true", + "multiqc", + "--version", + "&>", + "v_multiqc.txt", + "2>&1", + "||", + "true", + "vcf2genome", + "-h", + "|&", + "head", + "-n", + "1", + "&>", + "v_vcf2genome.txt", + "||", + "true", + "mtnucratio", + "--help", + "&>", + "v_mtnucratiocalculator.txt", + "||", + "true", + "sexdeterrmine", + "--version", + "&>", + "v_sexdeterrmine.txt", + "||", + "true", + "kraken2", + "--version", + "|", + "head", + "-n", + "1", + "&>", + "v_kraken.txt", + "||", + "true", + "endorS.py", + "--version", + "&>", + "v_endorSpy.txt", + "||", + "true", + "pileupCaller", + "--version", + "&>", + "v_sequencetools.txt", + "2>&1", + "||", + "true", + "bowtie2", + "--version", + "|", + "grep", + "-a", + "bowtie2-.*", + "-fdebug", + ">", + "v_bowtie2.txt", + "||", + "true", + "eigenstrat_snp_coverage", + "--version", + "|", + "cut", + "-d", + "-f2", + ">v_eigenstrat_snp_coverage.txt", + "||", + "true", + "mapDamage", + "--version", + ">", + "v_mapdamage.txt", + "||", + "true", + "bbduk.sh", + "|", + "grep", + "Last", + "modified", + "|", + "cut", + "-d", + "-f", + "3-99", + ">", + "v_bbduk.txt", + "||", + "true", + "scrape_software_versions.py", + "&>", + "software_versions_mqc.yaml" + ] + }, + "parents": [ + "output_documentation" + ], + "children": [ + "fastqc_jk2782_l1", + "fastqc_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000006", + "category": "get_software_versions", + "avgCPU": 147.8, + "bytesRead": 172760, + "bytesWritten": 1048, + "memory": 387324 + }, + { + "name": "fastqc_jk2782_l1", + "type": "compute", + "runtime": 175.205, + "command": { + "program": "fastqc", + "arguments": [ + "fastqc", + "-t", + "2", + "-q", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "JK2782_TGGCCGATCAACGA_L008_R2_001.fastq.gz.tengrand.fq.gz", + "rename", + "s/_fastqc.zip$/_raw_fastqc.zip/", + "*_fastqc.zip", + "rename", + "s/_fastqc.html$/_raw_fastqc.html/", + "*_fastqc.html" + ] + }, + "parents": [ + "get_software_versions" + ], + "children": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000007", + "category": "fastqc", + "avgCPU": 161.8, + "bytesRead": 35981, + "bytesWritten": 3967, + "memory": 270124 + }, + { + "name": "adapter_removal_jk2782_l1", + "type": "compute", + "runtime": 172.643, + "command": { + "program": "adapter_removal", + "arguments": [ + "mkdir", + "-p", + "output", + "AdapterRemoval", + "--file1", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "--file2", + "JK2782_TGGCCGATCAACGA_L008_R2_001.fastq.gz.tengrand.fq.gz", + "--basename", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe", + "--gzip", + "--threads", + "2", + "--qualitymax", + "41", + "--collapse", + "--trimns", + "--trimqualities", + "--adapter1", + "AGATCGGAAGAGCACACGTCTGAACTCCAGTCAC", + "--adapter2", + "AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGTA", + "--minlength", + "30", + "--minquality", + "20", + "--minadapteroverlap", + "1", + "cat", + "*.collapsed.gz", + "*.collapsed.truncated.gz", + "*.singleton.truncated.gz", + "*.pair1.truncated.gz", + "*.pair2.truncated.gz", + ">", + "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.tmp.fq.gz", + "mv", + "*.settings", + "output/", + "##", + "Add", + "R_", + "and", + "L_", + "for", + "unmerged", + "reads", + "for", + "DeDup", + "compatibility", + "AdapterRemovalFixPrefix", + "-Xmx4g", + "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.tmp.fq.gz", + "|", + "pigz", + "-p", + "1", + ">", + "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz" + ] + }, + "parents": [ + "fastqc_jk2782_l1", + "fastqc_jk2802_l2" + ], + "children": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000008", + "category": "adapter_removal", + "avgCPU": 160.9, + "bytesRead": 17357, + "bytesWritten": 4405, + "memory": 79308 + }, + { + "name": "fastqc_jk2802_l2", + "type": "compute", + "runtime": 177.338, + "command": { + "program": "fastqc", + "arguments": [ + "fastqc", + "-q", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "rename", + "s/_fastqc.zip$/_raw_fastqc.zip/", + "*_fastqc.zip", + "rename", + "s/_fastqc.html$/_raw_fastqc.html/", + "*_fastqc.html" + ] + }, + "parents": [ + "get_software_versions" + ], + "children": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000009", + "category": "fastqc", + "avgCPU": 120.1, + "bytesRead": 24457, + "bytesWritten": 2181, + "memory": 181060 + }, + { + "name": "adapter_removal_jk2802_l2", + "type": "compute", + "runtime": 174.313, + "command": { + "program": "adapter_removal", + "arguments": [ + "mkdir", + "-p", + "output", + "AdapterRemoval", + "--file1", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "--basename", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se", + "--gzip", + "--threads", + "2", + "--qualitymax", + "41", + "--trimns", + "--trimqualities", + "--adapter1", + "AGATCGGAAGAGCACACGTCTGAACTCCAGTCAC", + "--adapter2", + "AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGTA", + "--minlength", + "30", + "--minquality", + "20", + "--minadapteroverlap", + "1", + "mv", + "*.settings", + "*.se.truncated.gz", + "output/" + ] + }, + "parents": [ + "fastqc_jk2782_l1", + "fastqc_jk2802_l2" + ], + "children": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000010", + "category": "adapter_removal", + "avgCPU": 106.5, + "bytesRead": 683, + "bytesWritten": 897, + "memory": 12136 + }, + { + "name": "fastqc_after_clipping_jk2782_l1", + "type": "compute", + "runtime": 15.371, + "command": { + "program": "fastqc_after_clipping", + "arguments": [ + "fastqc", + "-t", + "2", + "-q", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz" + ] + }, + "parents": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "children": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "files": [], + "cores": 2.0, + "id": "ID000013", + "category": "fastqc_after_clipping", + "avgCPU": 133.3, + "bytesRead": 23788, + "bytesWritten": 1998, + "memory": 215020 + }, + { + "name": "fastqc_after_clipping_jk2802_l2", + "type": "compute", + "runtime": 15.272, + "command": { + "program": "fastqc_after_clipping", + "arguments": [ + "fastqc", + "-t", + "2", + "-q", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz" + ] + }, + "parents": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "children": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "files": [], + "cores": 2.0, + "id": "ID000014", + "category": "fastqc_after_clipping", + "avgCPU": 124.1, + "bytesRead": 23882, + "bytesWritten": 2143, + "memory": 213064 + }, + { + "name": "bwa_jk2802", + "type": "compute", + "runtime": 9.566, + "command": { + "program": "bwa", + "arguments": [ + "bwa", + "aln", + "-t", + "2", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz", + "-n", + "0.04", + "-l", + "1024", + "-k", + "2", + "-o", + "1", + "-f", + "JK2802.sai", + "bwa", + "samse", + "-r", + "\"@RGtID:ILLUMINA-JK2802tSM:JK2802tPL:illuminatPU:ILLUMINA-JK2802-SE\"", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2802.sai", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz", + "|", + "samtools", + "sort", + "-@", + "1", + "-O", + "bam", + "-", + ">", + "\"JK2802\"_\"SE\".mapped.bam", + "samtools", + "index", + "\"JK2802\"_\"SE\".mapped.bam" + ] + }, + "parents": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "children": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000016", + "category": "bwa", + "avgCPU": 15.7, + "bytesRead": 3774, + "bytesWritten": 3367, + "memory": 10628 + }, + { + "name": "bwa_jk2782", + "type": "compute", + "runtime": 9.652, + "command": { + "program": "bwa", + "arguments": [ + "bwa", + "aln", + "-t", + "2", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz", + "-n", + "0.04", + "-l", + "1024", + "-k", + "2", + "-o", + "1", + "-f", + "JK2782.sai", + "bwa", + "samse", + "-r", + "\"@RGtID:ILLUMINA-JK2782tSM:JK2782tPL:illuminatPU:ILLUMINA-JK2782-PE\"", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2782.sai", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz", + "|", + "samtools", + "sort", + "-@", + "1", + "-O", + "bam", + "-", + ">", + "\"JK2782\"_\"PE\".mapped.bam", + "samtools", + "index", + "\"JK2782\"_\"PE\".mapped.bam" + ] + }, + "parents": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "children": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000015", + "category": "bwa", + "avgCPU": 69.8, + "bytesRead": 3705, + "bytesWritten": 3355, + "memory": 12876 + }, + { + "name": "samtools_flagstat_jk2782", + "type": "compute", + "runtime": 13.011, + "command": { + "program": "samtools_flagstat", + "arguments": [ + "samtools", + "flagstat", + "JK2782_PE.mapped.bam", + ">", + "JK2782_flagstat.stats" + ] + }, + "parents": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "children": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000026", + "category": "samtools_flagstat", + "avgCPU": 30.1, + "bytesRead": 478, + "bytesWritten": 5, + "memory": 6468 + }, + { + "name": "samtools_flagstat_jk2802", + "type": "compute", + "runtime": 13.129, + "command": { + "program": "samtools_flagstat", + "arguments": [ + "samtools", + "flagstat", + "JK2802_SE.mapped.bam", + ">", + "JK2802_flagstat.stats" + ] + }, + "parents": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "children": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000024", + "category": "samtools_flagstat", + "avgCPU": 118.5, + "bytesRead": 551, + "bytesWritten": 5 + }, + { + "name": "markduplicates_jk2782", + "type": "compute", + "runtime": 22.655, + "command": { + "program": "markduplicates", + "arguments": [ + "mv", + "JK2782_PE.mapped.bam", + "JK2782.bam", + "picard", + "-Xmx4096M", + "MarkDuplicates", + "INPUT=JK2782.bam", + "OUTPUT=JK2782_rmdup.bam", + "REMOVE_DUPLICATES=TRUE", + "AS=TRUE", + "METRICS_FILE=\"JK2782_rmdup.metrics\"", + "VALIDATION_STRINGENCY=SILENT", + "samtools", + "index", + "JK2782_rmdup.bam" + ] + }, + "parents": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "children": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000021", + "category": "markduplicates", + "avgCPU": 173.6, + "bytesRead": 24055, + "bytesWritten": 2319, + "memory": 1400048 + }, + { + "name": "markduplicates_jk2802", + "type": "compute", + "runtime": 21.545, + "command": { + "program": "markduplicates", + "arguments": [ + "mv", + "JK2802_SE.mapped.bam", + "JK2802.bam", + "picard", + "-Xmx4096M", + "MarkDuplicates", + "INPUT=JK2802.bam", + "OUTPUT=JK2802_rmdup.bam", + "REMOVE_DUPLICATES=TRUE", + "AS=TRUE", + "METRICS_FILE=\"JK2802_rmdup.metrics\"", + "VALIDATION_STRINGENCY=SILENT", + "samtools", + "index", + "JK2802_rmdup.bam" + ] + }, + "parents": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "children": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000020", + "category": "markduplicates", + "avgCPU": 182.6, + "bytesRead": 24242, + "bytesWritten": 2466, + "memory": 1404624 + }, + { + "name": "preseq_jk2782", + "type": "compute", + "runtime": 12.299, + "command": { + "program": "preseq", + "arguments": [ + "preseq", + "c_curve", + "-s", + "1000", + "-o", + "JK2782_PE.mapped.ccurve", + "-B", + "JK2782_PE.mapped.bam" + ] + }, + "parents": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "children": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000030", + "category": "preseq", + "avgCPU": 81.9, + "bytesRead": 473, + "bytesWritten": 4, + "memory": 12032 + }, + { + "name": "preseq_jk2802", + "type": "compute", + "runtime": 10.188, + "command": { + "program": "preseq", + "arguments": [ + "preseq", + "c_curve", + "-s", + "1000", + "-o", + "JK2802_SE.mapped.ccurve", + "-B", + "JK2802_SE.mapped.bam" + ] + }, + "parents": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "children": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000027", + "category": "preseq", + "avgCPU": 77.6, + "bytesRead": 548, + "bytesWritten": 4, + "memory": 11972 + }, + { + "name": "endorspy_jk2782", + "type": "compute", + "runtime": 7.537, + "command": { + "program": "endorspy", + "arguments": [ + "endorS.py", + "-o", + "json", + "-n", + "JK2782", + "JK2782_flagstat.stats" + ] + }, + "parents": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "children": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000031", + "category": "endorspy", + "avgCPU": 44.7, + "bytesRead": 623, + "bytesWritten": 4, + "memory": 12264 + }, + { + "name": "endorspy_jk2802", + "type": "compute", + "runtime": 8.0, + "command": { + "program": "endorspy", + "arguments": [ + "endorS.py", + "-o", + "json", + "-n", + "JK2802", + "JK2802_flagstat.stats" + ] + }, + "parents": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "children": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000032", + "category": "endorspy", + "avgCPU": 54.0, + "bytesRead": 623, + "bytesWritten": 4, + "memory": 12224 + }, + { + "name": "damageprofiler_jk2802", + "type": "compute", + "runtime": 18.596, + "command": { + "program": "damageprofiler", + "arguments": [ + "damageprofiler", + "-Xmx4g", + "-i", + "JK2802_rmdup.bam", + "-r", + "Mammoth_MT_Krause.fasta", + "-l", + "100", + "-t", + "15", + "-o", + ".", + "-yaxis_damageplot", + "0.30" + ] + }, + "parents": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "children": [ + "qualimap_jk2802", + "qualimap_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000033", + "category": "damageprofiler", + "avgCPU": 88.6, + "bytesRead": 25744, + "bytesWritten": 391, + "memory": 242940 + }, + { + "name": "damageprofiler_jk2782", + "type": "compute", + "runtime": 16.736, + "command": { + "program": "damageprofiler", + "arguments": [ + "damageprofiler", + "-Xmx4g", + "-i", + "JK2782_rmdup.bam", + "-r", + "Mammoth_MT_Krause.fasta", + "-l", + "100", + "-t", + "15", + "-o", + ".", + "-yaxis_damageplot", + "0.30" + ] + }, + "parents": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "children": [ + "qualimap_jk2802", + "qualimap_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000036", + "category": "damageprofiler", + "avgCPU": 88.3, + "bytesRead": 25661, + "bytesWritten": 327, + "memory": 198276 + }, + { + "name": "qualimap_jk2802", + "type": "compute", + "runtime": 15.368, + "command": { + "program": "qualimap", + "arguments": [ + "qualimap", + "bamqc", + "-bam", + "JK2802_rmdup.bam", + "-nt", + "2", + "-outdir", + ".", + "-outformat", + "\"HTML\"", + "--java-mem-size=4G" + ] + }, + "parents": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "children": [ + "multiqc_1" + ], + "files": [], + "cores": 2.0, + "id": "ID000053", + "category": "qualimap", + "avgCPU": 177.7, + "bytesRead": 35038, + "bytesWritten": 1712, + "memory": 209440 + }, + { + "name": "qualimap_jk2782", + "type": "compute", + "runtime": 14.223, + "command": { + "program": "qualimap", + "arguments": [ + "qualimap", + "bamqc", + "-bam", + "JK2782_rmdup.bam", + "-nt", + "2", + "-outdir", + ".", + "-outformat", + "\"HTML\"", + "--java-mem-size=4G" + ] + }, + "parents": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "children": [ + "multiqc_1" + ], + "files": [], + "cores": 2.0, + "id": "ID000054", + "category": "qualimap", + "avgCPU": 181.9, + "bytesRead": 34954, + "bytesWritten": 1937, + "memory": 232196 + }, + { + "name": "multiqc_1", + "type": "compute", + "runtime": 46.376, + "command": { + "program": "multiqc", + "arguments": [ + "multiqc", + "-f", + "multiqc_config.yaml", + "." + ] + }, + "parents": [ + "qualimap_jk2802", + "qualimap_jk2782" + ], + "children": [], + "files": [], + "cores": 1.0, + "id": "ID000056", + "category": "multiqc", + "avgCPU": 93.0, + "bytesRead": 1215169, + "bytesWritten": 22599, + "memory": 139496 + } + ] + } +} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt index be26f540..74202718 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt @@ -35,21 +35,18 @@ internal class WtfTaskTable(private val path: Path) : Table { override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_ID -> true - TASK_WORKFLOW_ID -> true - TASK_SUBMIT_TIME -> true - TASK_WAIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_PARENTS -> true - TASK_CHILDREN -> true - TASK_GROUP_ID -> true - TASK_USER_ID -> true - else -> false - } - } + override val columns: List<TableColumn<*>> = listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN, + TASK_GROUP_ID, + TASK_USER_ID + ) override fun newReader(): TableReader { val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index b6789542..5e2463f8 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -25,6 +25,8 @@ package org.opendc.trace.wtf import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant /** * A [TableReader] implementation for the WTF format. @@ -61,14 +63,14 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic @Suppress("UNCHECKED_CAST") val res: Any = when (column) { - TASK_ID -> record["id"] - TASK_WORKFLOW_ID -> record["workflow_id"] - TASK_SUBMIT_TIME -> record["ts_submit"] - TASK_WAIT_TIME -> record["wait_time"] - TASK_RUNTIME -> record["runtime"] + TASK_ID -> (record["id"] as Long).toString() + TASK_WORKFLOW_ID -> (record["workflow_id"] as Long).toString() + TASK_SUBMIT_TIME -> Instant.ofEpochMilli(record["ts_submit"] as Long) + TASK_WAIT_TIME -> Duration.ofMillis(record["wait_time"] as Long) + TASK_RUNTIME -> Duration.ofMillis(record["runtime"] as Long) TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt() - TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet() - TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet() + TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet() + TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet() TASK_GROUP_ID -> record["group_id"] TASK_USER_ID -> record["user_id"] else -> throw IllegalArgumentException("Invalid column") @@ -94,16 +96,7 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic } override fun getLong(column: TableColumn<Long>): Long { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (column) { - TASK_ID -> record["id"] as Long - TASK_WORKFLOW_ID -> record["workflow_id"] as Long - TASK_SUBMIT_TIME -> record["ts_submit"] as Long - TASK_WAIT_TIME -> record["wait_time"] as Long - TASK_RUNTIME -> record["runtime"] as Long - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn<Double>): Double { diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt index 7eff0f5a..a755a107 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt @@ -43,5 +43,5 @@ public class WtfTrace internal constructor(private val path: Path) : Trace { return WtfTaskTable(path) } - override fun toString(): String = "SwfTrace[$path]" + override fun toString(): String = "WtfTrace[$path]" } diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index a05a523e..b155f265 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -28,6 +28,8 @@ import org.junit.jupiter.api.assertThrows import org.opendc.trace.* import java.io.File import java.net.URL +import java.time.Duration +import java.time.Instant /** * Test suite for the [WtfTraceFormat] class. @@ -91,20 +93,20 @@ class WtfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(362334516345962206, reader.getLong(TASK_ID)) }, - { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(245604, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(8163, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) }, + { assertEquals("362334516345962206", reader.get(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) }, + { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) }, ) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(502010169100446658, reader.getLong(TASK_ID)) }, - { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(251325, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(8216, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) }, + { assertEquals("502010169100446658", reader.get(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) }, + { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) }, ) reader.close() diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt index a390fe08..9ee3736e 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt @@ -81,17 +81,17 @@ internal class TraceReplayer(private val trace: Trace) { try { while (reader.nextRow()) { // Bag of tasks without workflow ID all share the same workflow - val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.getLong(TASK_WORKFLOW_ID) else 0L + val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } - val id = reader.getLong(TASK_ID) + val id = reader.get(TASK_ID).toLong() val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS)) reader.getInt(TASK_ALLOC_NCPUS) else reader.getInt(TASK_REQ_NCPUS) - val submitTime = reader.getLong(TASK_SUBMIT_TIME) - val runtime = reader.getLong(TASK_RUNTIME) - val flops: Long = 4000 * runtime * grantedCpus + val submitTime = reader.get(TASK_SUBMIT_TIME) + val runtime = reader.get(TASK_RUNTIME) + val flops: Long = 4000 * runtime.seconds * grantedCpus val workload = SimFlopsWorkload(flops) val task = Task( UUID(0L, id), @@ -100,14 +100,14 @@ internal class TraceReplayer(private val trace: Trace) { mapOf( "workload" to workload, WORKFLOW_TASK_CORES to grantedCpus, - WORKFLOW_TASK_DEADLINE to (runtime * 1000) + WORKFLOW_TASK_DEADLINE to runtime.toMillis() ), ) tasks[id] = task - taskDependencies[task] = reader.get(TASK_PARENTS) + taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet() - (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime) { a, b -> min(a as Long, b as Long) } + (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) } (workflow.tasks as MutableSet<Task>).add(task) } diff --git a/settings.gradle.kts b/settings.gradle.kts index 427cdb52..60e67e2b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -49,6 +49,7 @@ include(":opendc-trace:opendc-trace-api") include(":opendc-trace:opendc-trace-gwf") include(":opendc-trace:opendc-trace-swf") include(":opendc-trace:opendc-trace-wtf") +include(":opendc-trace:opendc-trace-wfformat") include(":opendc-trace:opendc-trace-bitbrains") include(":opendc-trace:opendc-trace-parquet") include(":opendc-harness:opendc-harness-api") |
