summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-12 12:08:55 +0200
committerGitHub <noreply@github.com>2021-09-12 12:08:55 +0200
commit2cd3bd18e548a72d64afe0e7f59487f4747d722f (patch)
treedc9e2fba5ca4d19a90934a8b68dbb8110ee34bb7
parentcae193284570d6ee9dbacdde57b3e4e367aa9d9f (diff)
parent992b65396f55c0e12b36823d191dea8e03dd45ba (diff)
merge: Add support for new trace formats
This pull request updates the trace API with the addition of several new trace formats. - Add support for Materna traces from GWA - Keep reader state in own class - Parse last column in Solvinity trace format - Add support Azure VM traces - Add support for WfCommons (WorkflowHub) traces - Add API for accessing available table columns - Add synthetic resource table for Bitbrains format - Support dynamic resolving of trace formats **Breaking API Changes** - Replace `isSupported` by a list of `TableColumns`
-rw-r--r--gradle/libs.versions.toml1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt488
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt127
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt149
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt54
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt169
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt46
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt56
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt17
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt17
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt55
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt36
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt144
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt17
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt35
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt34
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt192
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt61
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt108
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt12
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt23
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt21
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt89
-rw-r--r--opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt12
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt27
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt18
-rw-r--r--opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-wfformat/build.gradle.kts37
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt56
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt234
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt47
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt47
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat1
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt345
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt133
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json1342
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt27
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt27
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt2
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt22
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt16
-rw-r--r--settings.gradle.kts1
47 files changed, 3667 insertions, 705 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 4e2fc777..ddede2e8 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -52,6 +52,7 @@ clikt = { module = "com.github.ajalt.clikt:clikt", version.ref = "clikt" }
progressbar = { module = "me.tongfei:progressbar", version.ref = "progressbar" }
# Format
+jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref = "jackson" }
jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" }
jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" }
jackson-dataformat-csv = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-csv", version.ref = "jackson" }
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 036d0638..7dadd14d 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -45,6 +45,7 @@ dependencies {
implementation(libs.progressbar)
implementation(libs.clikt)
implementation(libs.jackson.module.kotlin)
+ implementation(libs.jackson.dataformat.csv)
implementation(kotlin("reflect"))
implementation(libs.parquet)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
index fa4e9ed8..ca937328 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
@@ -90,9 +90,9 @@ class RawParquetTraceReader(private val path: File) {
}
val submissionTime = reader.get(RESOURCE_START_TIME)
- val endTime = reader.get(RESOURCE_END_TIME)
+ val endTime = reader.get(RESOURCE_STOP_TIME)
val maxCores = reader.getInt(RESOURCE_NCPUS)
- val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY)
+ val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
val vmFragments = fragments.getValue(id).asSequence()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
index a021de8d..1f3878eb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
@@ -25,80 +25,74 @@ package org.opendc.experiments.capelin.trace
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.arguments.argument
import com.github.ajalt.clikt.parameters.groups.OptionGroup
-import com.github.ajalt.clikt.parameters.groups.groupChoice
+import com.github.ajalt.clikt.parameters.groups.cooccurring
import com.github.ajalt.clikt.parameters.options.*
-import com.github.ajalt.clikt.parameters.types.file
-import com.github.ajalt.clikt.parameters.types.long
-import me.tongfei.progressbar.ProgressBar
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
+import com.github.ajalt.clikt.parameters.types.*
+import mu.KotlinLogging
import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.experiments.capelin.trace.azure.AzureTraceFormat
+import org.opendc.experiments.capelin.trace.bp.BP_RESOURCES_SCHEMA
+import org.opendc.experiments.capelin.trace.bp.BP_RESOURCE_STATES_SCHEMA
import org.opendc.experiments.capelin.trace.sv.SvTraceFormat
import org.opendc.trace.*
import org.opendc.trace.bitbrains.BitbrainsTraceFormat
-import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.parquet.LocalOutputFile
-import java.io.BufferedReader
import java.io.File
-import java.io.FileReader
import java.util.*
import kotlin.math.max
import kotlin.math.min
+import kotlin.math.roundToLong
+
+/**
+ * A script to convert a trace in text format into a Parquet trace.
+ */
+fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
/**
* Represents the command for converting traces
*/
class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
+ * The logger instance for the converter.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
* The directory where the trace should be stored.
*/
- private val outputPath by option("-O", "--output", help = "path to store the trace")
+ private val output by option("-O", "--output", help = "path to store the trace")
.file(canBeFile = false, mustExist = false)
.defaultLazy { File("output") }
/**
* The directory where the input trace is located.
*/
- private val inputPath by argument("input", help = "path to the input trace")
+ private val input by argument("input", help = "path to the input trace")
.file(canBeFile = false)
/**
- * The input type of the trace.
+ * The input format of the trace.
*/
- private val type by option("-t", "--type", help = "input type of trace").groupChoice(
- "solvinity" to SolvinityConversion(),
- "bitbrains" to BitbrainsConversion(),
- "azure" to AzureConversion()
- )
+ private val format by option("-f", "--format", help = "input format of trace")
+ .choice(
+ "solvinity" to SvTraceFormat(),
+ "bitbrains" to BitbrainsTraceFormat(),
+ "azure" to AzureTraceFormat()
+ )
+ .required()
+
+ /**
+ * The sampling options.
+ */
+ private val samplingOptions by SamplingOptions().cooccurring()
override fun run() {
- val metaSchema = SchemaBuilder
- .record("meta")
- .namespace("org.opendc.format.sc20")
- .fields()
- .name("id").type().stringType().noDefault()
- .name("submissionTime").type().longType().noDefault()
- .name("endTime").type().longType().noDefault()
- .name("maxCores").type().intType().noDefault()
- .name("requiredMemory").type().longType().noDefault()
- .endRecord()
- val schema = SchemaBuilder
- .record("trace")
- .namespace("org.opendc.format.sc20")
- .fields()
- .name("id").type().stringType().noDefault()
- .name("time").type().longType().noDefault()
- .name("duration").type().longType().noDefault()
- .name("cores").type().intType().noDefault()
- .name("cpuUsage").type().doubleType().noDefault()
- .name("flops").type().longType().noDefault()
- .endRecord()
-
- val metaParquet = File(outputPath, "meta.parquet")
- val traceParquet = File(outputPath, "trace.parquet")
+ val metaParquet = File(output, "meta.parquet")
+ val traceParquet = File(output, "trace.parquet")
if (metaParquet.exists()) {
metaParquet.delete()
@@ -107,324 +101,160 @@ class TraceConverterCli : CliktCommand(name = "trace-converter") {
traceParquet.delete()
}
+ val trace = format.open(input.toURI().toURL())
+
+ logger.info { "Building resources table" }
+
val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet))
- .withSchema(metaSchema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .withSchema(BP_RESOURCES_SCHEMA)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .enablePageWriteChecksum()
.build()
+ val selectedVms = metaWriter.use { convertResources(trace, it) }
+
+ logger.info { "Wrote ${selectedVms.size} rows" }
+ logger.info { "Building resource states table" }
+
val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .withSchema(BP_RESOURCE_STATES_SCHEMA)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .enableDictionaryEncoding()
+ .enablePageWriteChecksum()
+ .withBloomFilterEnabled("id", true)
+ .withBloomFilterNDV("id", selectedVms.size.toLong())
.build()
- try {
- val type = type ?: throw IllegalArgumentException("Invalid trace conversion")
- val allFragments = type.read(inputPath, metaSchema, metaWriter)
- allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
-
- for (fragment in allFragments) {
- val record = GenericData.Record(schema)
- record.put("id", fragment.id)
- record.put("time", fragment.tick)
- record.put("duration", fragment.duration)
- record.put("cores", fragment.cores)
- record.put("cpuUsage", fragment.usage)
- record.put("flops", fragment.flops)
-
- writer.write(record)
- }
- } finally {
- writer.close()
- metaWriter.close()
- }
+ val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) }
+ logger.info { "Wrote $statesCount rows" }
}
-}
-/**
- * The supported trace conversions.
- */
-sealed class TraceConversion(name: String) : OptionGroup(name) {
/**
- * Read the fragments of the trace.
+ * Convert the resources table for the trace.
*/
- abstract fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment>
-}
-
-/**
- * A [TraceConversion] that uses the Trace API to perform the conversion.
- */
-abstract class AbstractConversion(name: String) : TraceConversion(name) {
- abstract val format: TraceFormat
-
- override fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment> {
- val fragments = mutableListOf<Fragment>()
- val trace = format.open(traceDirectory.toURI().toURL())
+ private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> {
+ val random = samplingOptions?.let { Random(it.seed) }
+ val samplingFraction = samplingOptions?.fraction ?: 1.0
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
- var lastId: String? = null
- var maxCores = Int.MIN_VALUE
- var requiredMemory = Long.MIN_VALUE
- var minTime = Long.MAX_VALUE
- var maxTime = Long.MIN_VALUE
- var lastTimestamp = Long.MIN_VALUE
-
- while (reader.nextRow()) {
- val id = reader.get(RESOURCE_STATE_ID)
-
- if (lastId != null && lastId != id) {
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", lastId)
- metaRecord.put("submissionTime", minTime)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
- metaWriter.write(metaRecord)
- }
- lastId = id
+ var hasNextRow = reader.nextRow()
+ val selectedVms = mutableSetOf<String>()
- val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP)
- val timestampMs = timestamp.toEpochMilli()
- val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
- val cores = reader.getInt(RESOURCE_STATE_NCPUS)
- val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)
+ while (hasNextRow) {
+ var id: String
+ var numCpus = Int.MIN_VALUE
+ var memCapacity = Double.MIN_VALUE
+ var memUsage = Double.MIN_VALUE
+ var startTime = Long.MAX_VALUE
+ var stopTime = Long.MIN_VALUE
- maxCores = max(maxCores, cores)
- requiredMemory = max(requiredMemory, (memCapacity / 1000).toLong())
+ do {
+ id = reader.get(RESOURCE_STATE_ID)
- if (lastTimestamp < 0) {
- lastTimestamp = timestampMs - 5 * 60 * 1000L
- minTime = min(minTime, lastTimestamp)
- }
+ val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
+ startTime = min(startTime, timestamp)
+ stopTime = max(stopTime, timestamp)
+
+ numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS))
- if (fragments.isEmpty()) {
- val duration = 5 * 60 * 1000L
- val flops: Long = (cpuUsage * duration / 1000).toLong()
- fragments.add(Fragment(id, lastTimestamp, flops, duration, cpuUsage, cores))
- } else {
- val last = fragments.last()
- val duration = timestampMs - lastTimestamp
- val flops: Long = (cpuUsage * duration / 1000).toLong()
-
- // Perform run-length encoding
- if (last.id == id && (duration == 0L || last.usage == cpuUsage)) {
- fragments[fragments.size - 1] = last.copy(duration = last.duration + duration)
- } else {
- fragments.add(
- Fragment(
- id,
- lastTimestamp,
- flops,
- duration,
- cpuUsage,
- cores
- )
- )
+ memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY))
+ if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) {
+ memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE))
}
+
+ hasNextRow = reader.nextRow()
+ } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
+
+ // Sample only a fraction of the VMs
+ if (random != null && random.nextDouble() > samplingFraction) {
+ continue
}
- val last = fragments.last()
- maxTime = max(maxTime, last.tick + last.duration)
- lastTimestamp = timestampMs
+ val builder = GenericRecordBuilder(BP_RESOURCES_SCHEMA)
+
+ builder["id"] = id
+ builder["submissionTime"] = startTime
+ builder["endTime"] = stopTime
+ builder["maxCores"] = numCpus
+ builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong()
+
+ logger.info { "Selecting VM $id" }
+
+ writer.write(builder.build())
+ selectedVms.add(id)
}
- return fragments
+
+ return selectedVms
}
-}
-class SolvinityConversion : AbstractConversion("Solvinity") {
- override val format: TraceFormat = SvTraceFormat()
-}
+ /**
+ * Convert the resource states table for the trace.
+ */
+ private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
-/**
- * Conversion of the Bitbrains public trace.
- */
-class BitbrainsConversion : AbstractConversion("Bitbrains") {
- override val format: TraceFormat = BitbrainsTraceFormat()
-}
+ var hasNextRow = reader.nextRow()
+ var count = 0
-/**
- * Conversion of the Azure public VM trace.
- */
-class AzureConversion : TraceConversion("Azure") {
- private val seed by option(help = "seed for trace sampling")
- .long()
- .default(0)
-
- override fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment> {
- val random = Random(seed)
- val fraction = 0.01
-
- // Read VM table
- val vmIdTableCol = 0
- val coreTableCol = 9
- val provisionedMemoryTableCol = 10
-
- var vmId: String
- var cores: Int
- var requiredMemory: Long
-
- val vmIds = mutableSetOf<String>()
- val vmIdToMetadata = mutableMapOf<String, VmInfo>()
-
- BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader ->
- reader.lineSequence()
- .chunked(1024)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
- // Sample only a fraction of the VMs
- if (random.nextDouble() > fraction) {
- continue
- }
-
- val values = line.split(",")
-
- // Exclude VMs with a large number of cores (not specified exactly)
- if (values[coreTableCol].contains(">")) {
- continue
- }
-
- vmId = values[vmIdTableCol].trim()
- cores = values[coreTableCol].trim().toInt()
- requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB
-
- vmIds.add(vmId)
- vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L)
- }
+ while (hasNextRow) {
+ var lastTimestamp = Long.MIN_VALUE
+
+ do {
+ val id = reader.get(RESOURCE_STATE_ID)
+
+ if (id !in selectedVms) {
+ hasNextRow = reader.nextRow()
+ continue
}
- }
- // Read VM metric reading files
- val timestampCol = 0
- val vmIdCol = 1
- val cpuUsageCol = 4
- val traceInterval = 5 * 60 * 1000L
-
- val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>()
- val vmIdToLastFragment = mutableMapOf<String, Fragment?>()
- val allFragments = mutableListOf<Fragment>()
-
- for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) {
- val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv")
- var timestamp: Long
- var cpuUsage: Double
-
- BufferedReader(FileReader(readingsFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split(",")
- vmId = values[vmIdCol].trim()
-
- // Ignore readings for VMs not in the sample
- if (!vmIds.contains(vmId)) {
- continue
- }
-
- timestamp = values[timestampCol].trim().toLong() * 1000L
- vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp)
- cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz
- vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp)
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
- val lastFragment = vmIdToLastFragment[vmId]
-
- vmIdToLastFragment[vmId] =
- if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) {
- Fragment(
- vmId,
- lastFragment.tick,
- lastFragment.flops + flops,
- lastFragment.duration + traceInterval,
- cpuUsage,
- vmIdToMetadata[vmId]!!.cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- vmIdToMetadata[vmId]!!.cores
- )
- if (lastFragment != null) {
- if (vmIdToFragments[vmId] == null) {
- vmIdToFragments[vmId] = mutableListOf()
- }
- vmIdToFragments[vmId]!!.add(lastFragment)
- allFragments.add(lastFragment)
- }
- fragment
- }
- }
- }
- }
- }
+ val builder = GenericRecordBuilder(BP_RESOURCE_STATES_SCHEMA)
+ builder["id"] = id
- for (entry in vmIdToLastFragment) {
- if (entry.value != null) {
- if (vmIdToFragments[entry.key] == null) {
- vmIdToFragments[entry.key] = mutableListOf()
+ val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
+ if (lastTimestamp < 0) {
+ lastTimestamp = timestamp - 5 * 60 * 1000L
}
- vmIdToFragments[entry.key]!!.add(entry.value!!)
- }
- }
- println("Read ${vmIdToLastFragment.size} VMs")
-
- for (entry in vmIdToMetadata) {
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", entry.key)
- metaRecord.put("submissionTime", entry.value.minTime)
- metaRecord.put("endTime", entry.value.maxTime)
- println("${entry.value.minTime} - ${entry.value.maxTime}")
- metaRecord.put("maxCores", entry.value.cores)
- metaRecord.put("requiredMemory", entry.value.requiredMemory)
- metaWriter.write(metaRecord)
- }
+ val duration = timestamp - lastTimestamp
+ val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
+ val flops = (cpuUsage * duration / 1000.0).roundToLong()
- return allFragments
- }
-}
+ builder["time"] = timestamp
+ builder["duration"] = duration
+ builder["cores"] = cores
+ builder["cpuUsage"] = cpuUsage
+ builder["flops"] = flops
-data class Fragment(
- val id: String,
- val tick: Long,
- val flops: Long,
- val duration: Long,
- val usage: Double,
- val cores: Int
-)
+ writer.write(builder.build())
-class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long)
+ lastTimestamp = timestamp
+ hasNextRow = reader.nextRow()
+ } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
-/**
- * A script to convert a trace in text format into a Parquet trace.
- */
-fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
+ count++
+ }
+
+ return count
+ }
+
+ /**
+ * Options for sampling the workload trace.
+ */
+ private class SamplingOptions : OptionGroup() {
+ /**
+ * The fraction of VMs to sample
+ */
+ val fraction by option("--sampling-fraction", help = "fraction of the workload to sample")
+ .double()
+ .restrictTo(0.0001, 1.0)
+ .required()
+
+ /**
+ * The seed for sampling the trace.
+ */
+ val seed by option("--sampling-seed", help = "seed for sampling the workload")
+ .long()
+ .default(0)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt
new file mode 100644
index 00000000..f98f4b2c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.azure
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
+
+/**
+ * The resource state [Table] for the Azure v1 VM traces.
+ */
+internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table {
+ /**
+ * The partitions that belong to the table.
+ */
+ private val partitions = Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "csv" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
+
+ override val name: String = TABLE_RESOURCE_STATES
+
+ override val isSynthetic: Boolean = false
+
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_STATE_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_STATE_CPU_USAGE_PCT
+ )
+
+ override fun newReader(): TableReader {
+ val it = partitions.iterator()
+
+ return object : TableReader {
+ var delegate: TableReader? = nextDelegate()
+
+ override fun nextRow(): Boolean {
+ var delegate = delegate
+
+ while (delegate != null) {
+ if (delegate.nextRow()) {
+ break
+ }
+
+ delegate.close()
+ delegate = nextDelegate()
+ }
+
+ this.delegate = delegate
+ return delegate != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.get(column)
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getBoolean(column)
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getInt(column)
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getLong(column)
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getDouble(column)
+ }
+
+ override fun close() {
+ delegate?.close()
+ }
+
+ private fun nextDelegate(): TableReader? {
+ return if (it.hasNext()) {
+ val (_, path) = it.next()
+ return AzureResourceStateTableReader(factory.createParser(path.toFile()))
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "AzureCompositeTableReader"
+ }
+ }
+
+ override fun newReader(partition: String): TableReader {
+ val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
+ return AzureResourceStateTableReader(factory.createParser(path.toFile()))
+ }
+
+ override fun toString(): String = "AzureResourceStateTable"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt
new file mode 100644
index 00000000..f80c0e82
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.azure
+
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import org.opendc.trace.*
+import java.time.Instant
+
+/**
+ * A [TableReader] for the Azure v1 VM resource state table.
+ */
+internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader {
+ init {
+ parser.schema = schema
+ }
+
+ override fun nextRow(): Boolean {
+ reset()
+
+ if (!nextStart()) {
+ return false
+ }
+
+ while (true) {
+ val token = parser.nextValue()
+
+ if (token == null || token == JsonToken.END_OBJECT) {
+ break
+ }
+
+ when (parser.currentName) {
+ "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue)
+ "vm id" -> id = parser.text
+ "avg cpu" -> cpuUsagePct = parser.doubleValue
+ }
+ }
+
+ return true
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_CPU_USAGE_PCT -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ RESOURCE_STATE_ID -> id
+ RESOURCE_STATE_TIMESTAMP -> timestamp
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ return when (column) {
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Advance the parser until the next object start.
+ */
+ private fun nextStart(): Boolean {
+ var token = parser.nextValue()
+
+ while (token != null && token != JsonToken.START_OBJECT) {
+ token = parser.nextValue()
+ }
+
+ return token != null
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var id: String? = null
+ private var timestamp: Instant? = null
+ private var cpuUsagePct = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ id = null
+ timestamp = null
+ cpuUsagePct = Double.NaN
+ }
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("timestamp", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm id", CsvSchema.ColumnType.STRING)
+ .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .build()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt
new file mode 100644
index 00000000..c9d4f7eb
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.azure
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * The resource [Table] for the Azure v1 VM traces.
+ */
+internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table {
+ override val name: String = TABLE_RESOURCES
+
+ override val isSynthetic: Boolean = false
+
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_ID,
+ RESOURCE_START_TIME,
+ RESOURCE_STOP_TIME,
+ RESOURCE_NCPUS,
+ RESOURCE_MEM_CAPACITY
+ )
+
+ override fun newReader(): TableReader {
+ return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile()))
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("No partition $partition")
+ }
+
+ override fun toString(): String = "AzureResourceTable"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt
new file mode 100644
index 00000000..b712b854
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt
@@ -0,0 +1,169 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.azure
+
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import org.apache.parquet.example.Paper.schema
+import org.opendc.trace.*
+import java.time.Instant
+
+/**
+ * A [TableReader] for the Azure v1 VM resources table.
+ */
+internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader {
+ init {
+ parser.schema = schema
+ }
+
+ override fun nextRow(): Boolean {
+ reset()
+
+ if (!nextStart()) {
+ return false
+ }
+
+ while (true) {
+ val token = parser.nextValue()
+
+ if (token == null || token == JsonToken.END_OBJECT) {
+ break
+ }
+
+ when (parser.currentName) {
+ "vm id" -> id = parser.text
+ "vm created" -> startTime = Instant.ofEpochSecond(parser.longValue)
+ "vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue)
+ "vm virtual core count" -> cpuCores = parser.intValue
+ "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB
+ }
+ }
+
+ return true
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_ID -> true
+ RESOURCE_START_TIME -> true
+ RESOURCE_STOP_TIME -> true
+ RESOURCE_NCPUS -> true
+ RESOURCE_MEM_CAPACITY -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ RESOURCE_ID -> id
+ RESOURCE_START_TIME -> startTime
+ RESOURCE_STOP_TIME -> stopTime
+ RESOURCE_NCPUS -> getInt(RESOURCE_STATE_NCPUS)
+ RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ return when (column) {
+ RESOURCE_NCPUS -> cpuCores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ return when (column) {
+ RESOURCE_MEM_CAPACITY -> memCapacity
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Advance the parser until the next object start.
+ */
+ private fun nextStart(): Boolean {
+ var token = parser.nextValue()
+
+ while (token != null && token != JsonToken.START_OBJECT) {
+ token = parser.nextValue()
+ }
+
+ return token != null
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var id: String? = null
+ private var startTime: Instant? = null
+ private var stopTime: Instant? = null
+ private var cpuCores = -1
+ private var memCapacity = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ fun reset() {
+ id = null
+ startTime = null
+ stopTime = null
+ cpuCores = -1
+ memCapacity = Double.NaN
+ }
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("vm id", CsvSchema.ColumnType.NUMBER)
+ .addColumn("subscription id", CsvSchema.ColumnType.STRING)
+ .addColumn("deployment id", CsvSchema.ColumnType.NUMBER)
+ .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER)
+ .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER)
+ .addColumn("max cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm category", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm memory", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .build()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt
new file mode 100644
index 00000000..24c60bab
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.azure
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * [Trace] implementation for the Azure v1 VM traces.
+ */
+class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
+
+ override fun containsTable(name: String): Boolean = name in tables
+
+ override fun getTable(name: String): Table? {
+ return when (name) {
+ TABLE_RESOURCES -> AzureResourceTable(factory, path)
+ TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path)
+ else -> null
+ }
+ }
+
+ override fun toString(): String = "AzureTrace[$path]"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt
new file mode 100644
index 00000000..744e43a0
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.azure
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A format implementation for the Azure v1 format.
+ */
+class AzureTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "azure-v1"
+
+ /**
+ * The [CsvFactory] used to create the parser.
+ */
+ private val factory = CsvFactory()
+ .enable(CsvParser.Feature.ALLOW_COMMENTS)
+ .enable(CsvParser.Feature.TRIM_SPACES)
+
+ /**
+ * Open the trace file.
+ */
+ override fun open(url: URL): AzureTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return AzureTrace(factory, path)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
index 35bfd5ef..f051bf88 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
@@ -34,16 +34,13 @@ internal class BPResourceStateTable(private val path: Path) : Table {
override val name: String = TABLE_RESOURCE_STATES
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_DURATION -> true
- RESOURCE_STATE_NCPUS -> true
- RESOURCE_STATE_CPU_USAGE -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_STATE_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_STATE_DURATION,
+ RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_USAGE,
+ )
override fun newReader(): TableReader {
val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
index 74d1e574..5b0f013f 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
@@ -34,16 +34,13 @@ internal class BPResourceTable(private val path: Path) : Table {
override val name: String = TABLE_RESOURCES
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_START_TIME -> true
- RESOURCE_END_TIME -> true
- RESOURCE_NCPUS -> true
- RESOURCE_MEM_CAPACITY -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_ID,
+ RESOURCE_START_TIME,
+ RESOURCE_STOP_TIME,
+ RESOURCE_NCPUS,
+ RESOURCE_MEM_CAPACITY
+ )
override fun newReader(): TableReader {
val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
index 0a105783..4416aae8 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
@@ -45,7 +45,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene
return when (column) {
RESOURCE_ID -> true
RESOURCE_START_TIME -> true
- RESOURCE_END_TIME -> true
+ RESOURCE_STOP_TIME -> true
RESOURCE_NCPUS -> true
RESOURCE_MEM_CAPACITY -> true
else -> false
@@ -59,9 +59,9 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene
val res: Any = when (column) {
RESOURCE_ID -> record["id"].toString()
RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long)
- RESOURCE_END_TIME -> Instant.ofEpochMilli(record["endTime"] as Long)
- RESOURCE_NCPUS -> record["maxCores"]
- RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble()
+ RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long)
+ RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS)
+ RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
else -> throw IllegalArgumentException("Invalid column")
}
@@ -90,7 +90,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene
val record = checkNotNull(record) { "Reader in invalid state" }
return when (column) {
- RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble()
+ RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB
else -> throw IllegalArgumentException("Invalid column")
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt
new file mode 100644
index 00000000..7dd8161d
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+
+/**
+ * Schema for the resources table in the trace.
+ */
+val BP_RESOURCES_SCHEMA: Schema = SchemaBuilder
+ .record("meta")
+ .namespace("org.opendc.trace.capelin")
+ .fields()
+ .requiredString("id")
+ .requiredLong("submissionTime")
+ .requiredLong("endTime")
+ .requiredInt("maxCores")
+ .requiredLong("requiredMemory")
+ .endRecord()
+
+/**
+ * Schema for the resource states table in the trace.
+ */
+val BP_RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder
+ .record("meta")
+ .namespace("org.opendc.trace.capelin")
+ .fields()
+ .requiredString("id")
+ .requiredLong("time")
+ .requiredLong("duration")
+ .requiredInt("cores")
+ .requiredDouble("cpuUsage")
+ .requiredLong("flops")
+ .endRecord()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
index 24abb109..67140fe9 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
@@ -31,7 +31,7 @@ import kotlin.io.path.extension
import kotlin.io.path.nameWithoutExtension
/**
- * The resource state [Table] in the Bitbrains format.
+ * The resource state [Table] in the extended Bitbrains format.
*/
internal class SvResourceStateTable(path: Path) : Table {
/**
@@ -40,28 +40,26 @@ internal class SvResourceStateTable(path: Path) : Table {
private val partitions = Files.walk(path, 1)
.filter { !Files.isDirectory(it) && it.extension == "txt" }
.collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
override val name: String = TABLE_RESOURCE_STATES
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_CLUSTER_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_NCPUS -> true
- RESOURCE_STATE_CPU_CAPACITY -> true
- RESOURCE_STATE_CPU_USAGE -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- RESOURCE_STATE_CPU_DEMAND -> true
- RESOURCE_STATE_CPU_READY_PCT -> true
- RESOURCE_STATE_MEM_CAPACITY -> true
- RESOURCE_STATE_DISK_READ -> true
- RESOURCE_STATE_DISK_WRITE -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_STATE_ID,
+ RESOURCE_STATE_CLUSTER_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_CAPACITY,
+ RESOURCE_STATE_CPU_USAGE,
+ RESOURCE_STATE_CPU_USAGE_PCT,
+ RESOURCE_STATE_CPU_DEMAND,
+ RESOURCE_STATE_CPU_READY_PCT,
+ RESOURCE_STATE_MEM_CAPACITY,
+ RESOURCE_STATE_DISK_READ,
+ RESOURCE_STATE_DISK_WRITE,
+ )
override fun newReader(): TableReader {
val it = partitions.iterator()
@@ -126,7 +124,7 @@ internal class SvResourceStateTable(path: Path) : Table {
}
}
- override fun toString(): String = "BitbrainsCompositeTableReader"
+ override fun toString(): String = "SvCompositeTableReader"
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
index 1a556f8d..6ea403fe 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
@@ -30,13 +30,8 @@ import java.time.Instant
* A [TableReader] for the Bitbrains resource state table.
*/
internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader {
- /**
- * The current parser state.
- */
- private val state = RowState()
-
override fun nextRow(): Boolean {
- state.reset()
+ reset()
var line: String
var num = 0
@@ -70,23 +65,23 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) :
end = line.indexOf(' ', start)
if (end < 0) {
- break
+ end = length
}
val field = line.subSequence(start, end) as String
when (col++) {
- COL_TIMESTAMP -> state.timestamp = Instant.ofEpochSecond(field.toLong(10))
- COL_CPU_USAGE -> state.cpuUsage = field.toDouble()
- COL_CPU_DEMAND -> state.cpuDemand = field.toDouble()
- COL_DISK_READ -> state.diskRead = field.toDouble()
- COL_DISK_WRITE -> state.diskWrite = field.toDouble()
- COL_CLUSTER_ID -> state.cluster = field.trim()
- COL_NCPUS -> state.cpuCores = field.toInt(10)
- COL_CPU_READY_PCT -> state.cpuReadyPct = field.toDouble()
- COL_POWERED_ON -> state.poweredOn = field.toInt(10) == 1
- COL_CPU_CAPACITY -> state.cpuCapacity = field.toDouble()
- COL_ID -> state.id = field.trim()
- COL_MEM_CAPACITY -> state.memCapacity = field.toDouble()
+ COL_TIMESTAMP -> timestamp = Instant.ofEpochSecond(field.toLong(10))
+ COL_CPU_USAGE -> cpuUsage = field.toDouble()
+ COL_CPU_DEMAND -> cpuDemand = field.toDouble()
+ COL_DISK_READ -> diskRead = field.toDouble()
+ COL_DISK_WRITE -> diskWrite = field.toDouble()
+ COL_CLUSTER_ID -> cluster = field.trim()
+ COL_NCPUS -> cpuCores = field.toInt(10)
+ COL_CPU_READY_PCT -> cpuReadyPct = field.toDouble()
+ COL_POWERED_ON -> poweredOn = field.toInt(10) == 1
+ COL_CPU_CAPACITY -> cpuCapacity = field.toDouble()
+ COL_ID -> id = field.trim()
+ COL_MEM_CAPACITY -> memCapacity = field.toDouble()
}
}
@@ -113,16 +108,16 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) :
override fun <T> get(column: TableColumn<T>): T {
val res: Any? = when (column) {
- RESOURCE_STATE_ID -> state.id
- RESOURCE_STATE_CLUSTER_ID -> state.cluster
- RESOURCE_STATE_TIMESTAMP -> state.timestamp
- RESOURCE_STATE_NCPUS -> state.cpuCores
- RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity
- RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
- RESOURCE_STATE_DISK_READ -> state.diskRead
- RESOURCE_STATE_DISK_WRITE -> state.diskWrite
+ RESOURCE_STATE_ID -> id
+ RESOURCE_STATE_CLUSTER_ID -> cluster
+ RESOURCE_STATE_TIMESTAMP -> timestamp
+ RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS)
+ RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY)
+ RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
+ RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT)
+ RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY)
+ RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ)
+ RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE)
else -> throw IllegalArgumentException("Invalid column")
}
@@ -132,14 +127,14 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) :
override fun getBoolean(column: TableColumn<Boolean>): Boolean {
return when (column) {
- RESOURCE_STATE_POWERED_ON -> state.poweredOn
+ RESOURCE_STATE_POWERED_ON -> poweredOn
else -> throw IllegalArgumentException("Invalid column")
}
}
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- RESOURCE_STATE_NCPUS -> state.cpuCores
+ RESOURCE_STATE_NCPUS -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -150,12 +145,13 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) :
override fun getDouble(column: TableColumn<Double>): Double {
return when (column) {
- RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity
- RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
- RESOURCE_STATE_DISK_READ -> state.diskRead
- RESOURCE_STATE_DISK_WRITE -> state.diskWrite
+ RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity
+ RESOURCE_STATE_CPU_DEMAND -> cpuDemand
+ RESOURCE_STATE_MEM_CAPACITY -> memCapacity
+ RESOURCE_STATE_DISK_READ -> diskRead
+ RESOURCE_STATE_DISK_WRITE -> diskWrite
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -165,51 +161,37 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) :
}
/**
- * The current row state.
+ * State fields of the reader.
*/
- private class RowState {
- @JvmField
- var id: String? = null
- @JvmField
- var cluster: String? = null
- @JvmField
- var timestamp: Instant? = null
- @JvmField
- var cpuCores = -1
- @JvmField
- var cpuCapacity = Double.NaN
- @JvmField
- var cpuUsage = Double.NaN
- @JvmField
- var cpuDemand = Double.NaN
- @JvmField
- var cpuReadyPct = Double.NaN
- @JvmField
- var memCapacity = Double.NaN
- @JvmField
- var diskRead = Double.NaN
- @JvmField
- var diskWrite = Double.NaN
- @JvmField
- var poweredOn: Boolean = false
-
- /**
- * Reset the state.
- */
- fun reset() {
- id = null
- timestamp = null
- cluster = null
- cpuCores = -1
- cpuCapacity = Double.NaN
- cpuUsage = Double.NaN
- cpuDemand = Double.NaN
- cpuReadyPct = Double.NaN
- memCapacity = Double.NaN
- diskRead = Double.NaN
- diskWrite = Double.NaN
- poweredOn = false
- }
+ private var id: String? = null
+ private var cluster: String? = null
+ private var timestamp: Instant? = null
+ private var cpuCores = -1
+ private var cpuCapacity = Double.NaN
+ private var cpuUsage = Double.NaN
+ private var cpuDemand = Double.NaN
+ private var cpuReadyPct = Double.NaN
+ private var memCapacity = Double.NaN
+ private var diskRead = Double.NaN
+ private var diskWrite = Double.NaN
+ private var poweredOn: Boolean = false
+
+ /**
+ * Reset the state of the reader.
+ */
+ private fun reset() {
+ id = null
+ timestamp = null
+ cluster = null
+ cpuCores = -1
+ cpuCapacity = Double.NaN
+ cpuUsage = Double.NaN
+ cpuDemand = Double.NaN
+ cpuReadyPct = Double.NaN
+ memCapacity = Double.NaN
+ diskRead = Double.NaN
+ diskWrite = Double.NaN
+ poweredOn = false
}
/**
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
index 44dec95b..e2e5ea6d 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
@@ -41,7 +41,7 @@ public val RESOURCE_START_TIME: TableColumn<Instant> = TableColumn("resource:sta
* End time for the resource.
*/
@JvmField
-public val RESOURCE_END_TIME: TableColumn<Instant> = TableColumn("resource:end_time", Instant::class.java)
+public val RESOURCE_STOP_TIME: TableColumn<Instant> = TableColumn("resource:stop_time", Instant::class.java)
/**
* Number of CPUs for the resource.
@@ -50,7 +50,7 @@ public val RESOURCE_END_TIME: TableColumn<Instant> = TableColumn("resource:end_t
public val RESOURCE_NCPUS: TableColumn<Int> = intColumn("resource:num_cpus")
/**
- * Memory capacity for the resource.
+ * Memory capacity for the resource in KB.
*/
@JvmField
public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource:mem_capacity")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
index 11e5d6b7..6aca2051 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
@@ -37,9 +37,9 @@ public interface Table {
public val isSynthetic: Boolean
/**
- * Determine whether the specified [column] is supported by this table.
+ * The list of columns supported in this table.
*/
- public fun isSupported(column: TableColumn<*>): Boolean
+ public val columns: List<TableColumn<*>>
/**
* Open a [TableReader] for this table.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
index 88bbc623..46920dce 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
@@ -23,49 +23,52 @@
@file:JvmName("TaskColumns")
package org.opendc.trace
+import java.time.Duration
+import java.time.Instant
+
/**
* A column containing the task identifier.
*/
@JvmField
-public val TASK_ID: TableColumn<Long> = longColumn("task:id")
+public val TASK_ID: TableColumn<String> = stringColumn("task:id")
/**
* A column containing the identifier of the workflow.
*/
@JvmField
-public val TASK_WORKFLOW_ID: TableColumn<Long> = longColumn("task:workflow_id")
+public val TASK_WORKFLOW_ID: TableColumn<String> = stringColumn("task:workflow_id")
/**
* A column containing the submit time of the task.
*/
@JvmField
-public val TASK_SUBMIT_TIME: TableColumn<Long> = longColumn("task:submit_time")
+public val TASK_SUBMIT_TIME: TableColumn<Instant> = TableColumn("task:submit_time", type = Instant::class.java)
/**
* A column containing the wait time of the task.
*/
@JvmField
-public val TASK_WAIT_TIME: TableColumn<Long> = longColumn("task:wait_time")
+public val TASK_WAIT_TIME: TableColumn<Instant> = TableColumn("task:wait_time", type = Instant::class.java)
/**
* A column containing the runtime time of the task.
*/
@JvmField
-public val TASK_RUNTIME: TableColumn<Long> = longColumn("task:runtime")
+public val TASK_RUNTIME: TableColumn<Duration> = TableColumn("task:runtime", type = Duration::class.java)
/**
* A column containing the parents of a task.
*/
@Suppress("UNCHECKED_CAST")
@JvmField
-public val TASK_PARENTS: TableColumn<Set<Long>> = TableColumn("task:parents", type = Set::class.java as Class<Set<Long>>)
+public val TASK_PARENTS: TableColumn<Set<String>> = TableColumn("task:parents", type = Set::class.java as Class<Set<String>>)
/**
* A column containing the children of a task.
*/
@Suppress("UNCHECKED_CAST")
@JvmField
-public val TASK_CHILDREN: TableColumn<Set<Long>> = TableColumn("task:children", type = Set::class.java as Class<Set<Long>>)
+public val TASK_CHILDREN: TableColumn<Set<String>> = TableColumn("task:children", type = Set::class.java as Class<Set<String>>)
/**
* A column containing the requested CPUs of a task.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
index 36e93b52..0ae45e86 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
@@ -22,6 +22,11 @@
package org.opendc.trace
+import org.opendc.trace.spi.TraceFormat
+import java.io.File
+import java.net.URL
+import java.nio.file.Path
+
/**
* A trace is a collection of related tables that characterize a workload.
*/
@@ -40,4 +45,34 @@ public interface Trace {
* Obtain a [Table] with the specified [name].
*/
public fun getTable(name: String): Table?
+
+ public companion object {
+ /**
+ * Open a [Trace] at the specified [url] in the given [format].
+ *
+ * @throws IllegalArgumentException if [format] is not supported.
+ */
+ public fun open(url: URL, format: String): Trace {
+ val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" }
+ return provider.open(url)
+ }
+
+ /**
+ * Open a [Trace] at the specified [path] in the given [format].
+ *
+ * @throws IllegalArgumentException if [format] is not supported.
+ */
+ public fun open(path: File, format: String): Trace {
+ return open(path.toURI().toURL(), format)
+ }
+
+ /**
+ * Open a [Trace] at the specified [path] in the given [format].
+ *
+ * @throws IllegalArgumentException if [format] is not supported.
+ */
+ public fun open(path: Path, format: String): Trace {
+ return open(path.toUri().toURL(), format)
+ }
+ }
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
index 767ef919..c9e5954d 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
@@ -33,7 +33,7 @@ import kotlin.io.path.nameWithoutExtension
/**
* The resource state [Table] in the Bitbrains format.
*/
-internal class BitbrainsResourceStateTable(private val factory: CsvFactory, private val path: Path) : Table {
+internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path: Path) : Table {
/**
* The partitions that belong to the table.
*/
@@ -41,28 +41,26 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, priv
Files.walk(path, 1)
.filter { !Files.isDirectory(it) && it.extension == "csv" }
.collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
override val name: String = TABLE_RESOURCE_STATES
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_NCPUS -> true
- RESOURCE_STATE_CPU_CAPACITY -> true
- RESOURCE_STATE_CPU_USAGE -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- RESOURCE_STATE_MEM_CAPACITY -> true
- RESOURCE_STATE_MEM_USAGE -> true
- RESOURCE_STATE_DISK_READ -> true
- RESOURCE_STATE_DISK_WRITE -> true
- RESOURCE_STATE_NET_RX -> true
- RESOURCE_STATE_NET_TX -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_STATE_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_CAPACITY,
+ RESOURCE_STATE_CPU_USAGE,
+ RESOURCE_STATE_CPU_USAGE_PCT,
+ RESOURCE_STATE_MEM_CAPACITY,
+ RESOURCE_STATE_MEM_USAGE,
+ RESOURCE_STATE_DISK_READ,
+ RESOURCE_STATE_DISK_WRITE,
+ RESOURCE_STATE_NET_RX,
+ RESOURCE_STATE_NET_TX,
+ )
override fun newReader(): TableReader {
val it = partitions.iterator()
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
index 5687ac7f..dab784c2 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
@@ -22,20 +22,42 @@
package org.opendc.trace.bitbrains
+import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
+import java.text.NumberFormat
import java.time.Instant
+import java.time.LocalDateTime
+import java.time.ZoneOffset
+import java.time.format.DateTimeFormatter
+import java.time.format.DateTimeParseException
+import java.util.*
/**
* A [TableReader] for the Bitbrains resource state table.
*/
internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader {
/**
- * The current parser state.
+ * The [DateTimeFormatter] used to parse the timestamps in case of the Materna trace.
*/
- private val state = RowState()
+ private val formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss")
+
+ /**
+ * The type of timestamps in the trace.
+ */
+ private var timestampType: TimestampType = TimestampType.UNDECIDED
+
+ /**
+ * The [NumberFormat] used to parse doubles containing a comma.
+ */
+ private val nf = NumberFormat.getInstance(Locale.GERMAN)
+
+ /**
+ * A flag to indicate that the trace contains decimals with a comma separator.
+ */
+ private var usesCommaDecimalSeparator = false
init {
parser.schema = schema
@@ -43,7 +65,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun nextRow(): Boolean {
// Reset the row state
- state.reset()
+ reset()
if (!nextStart()) {
return false
@@ -57,17 +79,32 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
}
when (parser.currentName) {
- "Timestamp [ms]" -> state.timestamp = Instant.ofEpochSecond(parser.longValue)
- "CPU cores" -> state.cpuCores = parser.intValue
- "CPU capacity provisioned [MHZ]" -> state.cpuCapacity = parser.doubleValue
- "CPU usage [MHZ]" -> state.cpuUsage = parser.doubleValue
- "CPU usage [%]" -> state.cpuUsagePct = parser.doubleValue
- "Memory capacity provisioned [KB]" -> state.memCapacity = parser.doubleValue
- "Memory usage [KB]" -> state.memUsage = parser.doubleValue
- "Disk read throughput [KB/s]" -> state.diskRead = parser.doubleValue
- "Disk write throughput [KB/s]" -> state.diskWrite = parser.doubleValue
- "Network received throughput [KB/s]" -> state.netReceived = parser.doubleValue
- "Network transmitted throughput [KB/s]" -> state.netTransmitted = parser.doubleValue
+ "Timestamp [ms]" -> {
+ timestamp = when (timestampType) {
+ TimestampType.UNDECIDED -> {
+ try {
+ val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
+ timestampType = TimestampType.DATE_TIME
+ res
+ } catch (e: DateTimeParseException) {
+ timestampType = TimestampType.EPOCH_MILLIS
+ Instant.ofEpochSecond(parser.longValue)
+ }
+ }
+ TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
+ TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue)
+ }
+ }
+ "CPU cores" -> cpuCores = parser.intValue
+ "CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble()
+ "CPU usage [MHZ]" -> cpuUsage = parseSafeDouble()
+ "CPU usage [%]" -> cpuUsagePct = parseSafeDouble() / 100.0 // Convert to range [0, 1]
+ "Memory capacity provisioned [KB]" -> memCapacity = parseSafeDouble()
+ "Memory usage [KB]" -> memUsage = parseSafeDouble()
+ "Disk read throughput [KB/s]" -> diskRead = parseSafeDouble()
+ "Disk write throughput [KB/s]" -> diskWrite = parseSafeDouble()
+ "Network received throughput [KB/s]" -> netReceived = parseSafeDouble()
+ "Network transmitted throughput [KB/s]" -> netTransmitted = parseSafeDouble()
}
}
@@ -95,17 +132,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun <T> get(column: TableColumn<T>): T {
val res: Any? = when (column) {
RESOURCE_STATE_ID -> partition
- RESOURCE_STATE_TIMESTAMP -> state.timestamp
- RESOURCE_STATE_NCPUS -> state.cpuCores
- RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct
- RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
- RESOURCE_STATE_MEM_USAGE -> state.memUsage
- RESOURCE_STATE_DISK_READ -> state.diskRead
- RESOURCE_STATE_DISK_WRITE -> state.diskWrite
- RESOURCE_STATE_NET_RX -> state.netReceived
- RESOURCE_STATE_NET_TX -> state.netTransmitted
+ RESOURCE_STATE_TIMESTAMP -> timestamp
+ RESOURCE_STATE_NCPUS -> cpuCores
+ RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ RESOURCE_STATE_MEM_CAPACITY -> memCapacity
+ RESOURCE_STATE_MEM_USAGE -> memUsage
+ RESOURCE_STATE_DISK_READ -> diskRead
+ RESOURCE_STATE_DISK_WRITE -> diskWrite
+ RESOURCE_STATE_NET_RX -> netReceived
+ RESOURCE_STATE_NET_TX -> netTransmitted
else -> throw IllegalArgumentException("Invalid column")
}
@@ -119,7 +156,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- RESOURCE_STATE_NCPUS -> state.cpuCores
+ RESOURCE_STATE_NCPUS -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -130,15 +167,15 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun getDouble(column: TableColumn<Double>): Double {
return when (column) {
- RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct
- RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
- RESOURCE_STATE_MEM_USAGE -> state.memUsage
- RESOURCE_STATE_DISK_READ -> state.diskRead
- RESOURCE_STATE_DISK_WRITE -> state.diskWrite
- RESOURCE_STATE_NET_RX -> state.netReceived
- RESOURCE_STATE_NET_TX -> state.netTransmitted
+ RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ RESOURCE_STATE_MEM_CAPACITY -> memCapacity
+ RESOURCE_STATE_MEM_USAGE -> memUsage
+ RESOURCE_STATE_DISK_READ -> diskRead
+ RESOURCE_STATE_DISK_WRITE -> diskWrite
+ RESOURCE_STATE_NET_RX -> netReceived
+ RESOURCE_STATE_NET_TX -> netTransmitted
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -161,37 +198,62 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
}
/**
- * The current row state.
+ * Try to parse the current value safely as double.
*/
- private class RowState {
- var timestamp: Instant? = null
- var cpuCores = -1
- var cpuCapacity = Double.NaN
- var cpuUsage = Double.NaN
- var cpuUsagePct = Double.NaN
- var memCapacity = Double.NaN
- var memUsage = Double.NaN
- var diskRead = Double.NaN
- var diskWrite = Double.NaN
- var netReceived = Double.NaN
- var netTransmitted = Double.NaN
+ private fun parseSafeDouble(): Double {
+ if (!usesCommaDecimalSeparator) {
+ try {
+ return parser.doubleValue
+ } catch (e: JsonParseException) {
+ usesCommaDecimalSeparator = true
+ }
+ }
- /**
- * Reset the state.
- */
- fun reset() {
- timestamp = null
- cpuCores = -1
- cpuCapacity = Double.NaN
- cpuUsage = Double.NaN
- cpuUsagePct = Double.NaN
- memCapacity = Double.NaN
- memUsage = Double.NaN
- diskRead = Double.NaN
- diskWrite = Double.NaN
- netReceived = Double.NaN
- netTransmitted = Double.NaN
+ val text = parser.text
+ if (text.isBlank()) {
+ return 0.0
}
+
+ return nf.parse(text).toDouble()
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var timestamp: Instant? = null
+ private var cpuCores = -1
+ private var cpuCapacity = Double.NaN
+ private var cpuUsage = Double.NaN
+ private var cpuUsagePct = Double.NaN
+ private var memCapacity = Double.NaN
+ private var memUsage = Double.NaN
+ private var diskRead = Double.NaN
+ private var diskWrite = Double.NaN
+ private var netReceived = Double.NaN
+ private var netTransmitted = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ timestamp = null
+ cpuCores = -1
+ cpuCapacity = Double.NaN
+ cpuUsage = Double.NaN
+ cpuUsagePct = Double.NaN
+ memCapacity = Double.NaN
+ memUsage = Double.NaN
+ diskRead = Double.NaN
+ diskWrite = Double.NaN
+ netReceived = Double.NaN
+ netTransmitted = Double.NaN
+ }
+
+ /**
+ * The type of the timestamp in the trace.
+ */
+ private enum class TimestampType {
+ UNDECIDED, DATE_TIME, EPOCH_MILLIS
}
companion object {
@@ -199,15 +261,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
* The [CsvSchema] that is used to parse the trace.
*/
private val schema = CsvSchema.builder()
- .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING)
.addColumn("CPU cores", CsvSchema.ColumnType.NUMBER)
.addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER)
.addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER)
.addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER)
.addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER)
.addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER)
.addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
.addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER)
.addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
.addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
.setAllowComments(true)
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt
new file mode 100644
index 00000000..bc4f0b7d
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
+
+/**
+ * The resources [Table] in the Bitbrains format.
+ */
+internal class BitbrainsResourceTable(private val factory: CsvFactory, path: Path) : Table {
+ /**
+ * The VMs that belong to the table.
+ */
+ private val vms =
+ Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "csv" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
+
+ override val name: String = TABLE_RESOURCES
+
+ override val isSynthetic: Boolean = true
+
+ override val columns: List<TableColumn<*>> = listOf(RESOURCE_ID)
+
+ override fun newReader(): TableReader {
+ return BitbrainsResourceTableReader(factory, vms)
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Unknown partition $partition")
+ }
+
+ override fun toString(): String = "BitbrainsResourceTable"
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
new file mode 100644
index 00000000..c02dc5ae
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * A [TableReader] for the Bitbrains resource table.
+ */
+internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms: Map<String, Path>) : TableReader {
+ /**
+ * An iterator to iterate over the resource entries.
+ */
+ private val it = vms.iterator()
+
+ override fun nextRow(): Boolean {
+ reset()
+
+ while (it.hasNext()) {
+ val (name, path) = it.next()
+
+ val parser = factory.createParser(path.toFile())
+ val reader = BitbrainsResourceStateTableReader(name, parser)
+
+ try {
+ if (!reader.nextRow()) {
+ continue
+ }
+
+ id = reader.get(RESOURCE_STATE_ID)
+ return true
+ } finally {
+ reader.close()
+ }
+ }
+
+ return false
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_ID -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ RESOURCE_ID -> id
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {}
+
+ /**
+ * State fields of the reader.
+ */
+ private var id: String? = null
+
+ /**
+ * Reset the state of the reader.
+ */
+ private fun reset() {
+ id = null
+ }
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt
index 5a2d4243..bcd8dd52 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt
@@ -30,16 +30,16 @@ import java.nio.file.Path
* [Trace] implementation for the Bitbrains format.
*/
public class BitbrainsTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_RESOURCE_STATES)
+ override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
- override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name
+ override fun containsTable(name: String): Boolean = tables.contains(name)
override fun getTable(name: String): Table? {
- if (!containsTable(name)) {
- return null
+ return when (name) {
+ TABLE_RESOURCES -> BitbrainsResourceTable(factory, path)
+ TABLE_RESOURCE_STATES -> BitbrainsResourceStateTable(factory, path)
+ else -> null
}
-
- return BitbrainsResourceStateTable(factory, path)
}
override fun toString(): String = "BitbrainsTrace[$path]"
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
index 550805d3..ff4a33f8 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
@@ -25,9 +25,7 @@ package org.opendc.trace.bitbrains
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
-import org.opendc.trace.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.RESOURCE_STATE_TIMESTAMP
-import org.opendc.trace.TABLE_RESOURCE_STATES
+import org.opendc.trace.*
import java.net.URL
/**
@@ -58,7 +56,7 @@ class BitbrainsTraceFormatTest {
val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
val trace = format.open(url)
- assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables)
+ assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables)
}
@Test
@@ -82,6 +80,23 @@ class BitbrainsTraceFormatTest {
}
@Test
+ fun testResources() {
+ val format = BitbrainsTraceFormat()
+ val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
+ val trace = format.open(url)
+
+ val reader = trace.getTable(TABLE_RESOURCES)!!.newReader()
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("bitbrains", reader.get(RESOURCE_ID)) },
+ { assertFalse(reader.nextRow()) }
+ )
+
+ reader.close()
+ }
+
+ @Test
fun testSmoke() {
val format = BitbrainsTraceFormat()
val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt
index 80a99d10..fd7bd068 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt
@@ -34,18 +34,15 @@ internal class GwfTaskTable(private val factory: CsvFactory, private val url: UR
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_WORKFLOW_ID -> true
- TASK_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_ALLOC_NCPUS -> true
- TASK_PARENTS -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ TASK_WORKFLOW_ID,
+ TASK_ID,
+ TASK_SUBMIT_TIME,
+ TASK_RUNTIME,
+ TASK_REQ_NCPUS,
+ TASK_ALLOC_NCPUS,
+ TASK_PARENTS
+ )
override fun newReader(): TableReader {
return GwfTaskTableReader(factory.createParser(url))
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
index 64b7d465..39eb5520 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
@@ -26,24 +26,21 @@ import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
+import java.time.Duration
+import java.time.Instant
import java.util.regex.Pattern
/**
* A [TableReader] implementation for the GWF format.
*/
internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
- /**
- * The current parser state.
- */
- private val state = RowState()
-
init {
parser.schema = schema
}
override fun nextRow(): Boolean {
// Reset the row state
- state.reset()
+ reset()
if (!nextStart()) {
return false
@@ -57,12 +54,12 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
}
when (parser.currentName) {
- "WorkflowID" -> state.workflowId = parser.longValue
- "JobID" -> state.jobId = parser.longValue
- "SubmitTime" -> state.submitTime = parser.longValue
- "RunTime" -> state.runtime = parser.longValue
- "NProcs" -> state.nProcs = parser.intValue
- "ReqNProcs" -> state.reqNProcs = parser.intValue
+ "WorkflowID" -> workflowId = parser.text
+ "JobID" -> jobId = parser.text
+ "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue)
+ "RunTime" -> runtime = Duration.ofSeconds(parser.longValue)
+ "NProcs" -> nProcs = parser.intValue
+ "ReqNProcs" -> reqNProcs = parser.intValue
"Dependencies" -> parseParents(parser.valueAsString)
}
}
@@ -84,14 +81,14 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
}
override fun <T> get(column: TableColumn<T>): T {
- val res: Any = when (column) {
- TASK_WORKFLOW_ID -> state.workflowId
- TASK_ID -> state.jobId
- TASK_SUBMIT_TIME -> state.submitTime
- TASK_RUNTIME -> state.runtime
- TASK_REQ_NCPUS -> state.nProcs
- TASK_ALLOC_NCPUS -> state.reqNProcs
- TASK_PARENTS -> state.dependencies
+ val res: Any? = when (column) {
+ TASK_WORKFLOW_ID -> workflowId
+ TASK_ID -> jobId
+ TASK_SUBMIT_TIME -> submitTime
+ TASK_RUNTIME -> runtime
+ TASK_REQ_NCPUS -> nProcs
+ TASK_ALLOC_NCPUS -> reqNProcs
+ TASK_PARENTS -> dependencies
else -> throw IllegalArgumentException("Invalid column")
}
@@ -105,20 +102,14 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- TASK_REQ_NCPUS -> state.nProcs
- TASK_ALLOC_NCPUS -> state.reqNProcs
+ TASK_REQ_NCPUS -> nProcs
+ TASK_ALLOC_NCPUS -> reqNProcs
else -> throw IllegalArgumentException("Invalid column")
}
}
override fun getLong(column: TableColumn<Long>): Long {
- return when (column) {
- TASK_WORKFLOW_ID -> state.workflowId
- TASK_ID -> state.jobId
- TASK_SUBMIT_TIME -> state.submitTime
- TASK_RUNTIME -> state.runtime
- else -> throw IllegalArgumentException("Invalid column")
- }
+ throw IllegalArgumentException("Invalid column")
}
override fun getDouble(column: TableColumn<Double>): Double {
@@ -166,29 +157,27 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
}
/**
- * The current row state.
+ * Reader state fields.
*/
- private class RowState {
- var workflowId = -1L
- var jobId = -1L
- var submitTime = -1L
- var runtime = -1L
- var nProcs = -1
- var reqNProcs = -1
- var dependencies = emptySet<Long>()
+ private var workflowId: String? = null
+ private var jobId: String? = null
+ private var submitTime: Instant? = null
+ private var runtime: Duration? = null
+ private var nProcs = -1
+ private var reqNProcs = -1
+ private var dependencies = emptySet<Long>()
- /**
- * Reset the state.
- */
- fun reset() {
- workflowId = -1
- jobId = -1
- submitTime = -1
- runtime = -1
- nProcs = -1
- reqNProcs = -1
- dependencies = emptySet()
- }
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ workflowId = null
+ jobId = null
+ submitTime = null
+ runtime = null
+ nProcs = -1
+ reqNProcs = -1
+ dependencies = emptySet()
}
companion object {
diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
index 6b0568fe..b209b979 100644
--- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
@@ -29,6 +29,8 @@ import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.opendc.trace.*
import java.net.URL
+import java.time.Duration
+import java.time.Instant
/**
* Test suite for the [GwfTraceFormat] class.
@@ -90,11 +92,11 @@ internal class GwfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(0L, reader.getLong(TASK_WORKFLOW_ID)) },
- { assertEquals(1L, reader.getLong(TASK_ID)) },
- { assertEquals(16, reader.getLong(TASK_SUBMIT_TIME)) },
- { assertEquals(11, reader.getLong(TASK_RUNTIME)) },
- { assertEquals(setOf<Long>(), reader.get(TASK_PARENTS)) },
+ { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) },
+ { assertEquals("1", reader.get(TASK_ID)) },
+ { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) },
+ { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) },
)
}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt
index 12a51a2f..7ec0d607 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt
@@ -34,21 +34,18 @@ internal class SwfTaskTable(private val path: Path) : Table {
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_WAIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_ALLOC_NCPUS -> true
- TASK_PARENTS -> true
- TASK_STATUS -> true
- TASK_GROUP_ID -> true
- TASK_USER_ID -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ TASK_ID,
+ TASK_SUBMIT_TIME,
+ TASK_WAIT_TIME,
+ TASK_RUNTIME,
+ TASK_REQ_NCPUS,
+ TASK_ALLOC_NCPUS,
+ TASK_PARENTS,
+ TASK_STATUS,
+ TASK_GROUP_ID,
+ TASK_USER_ID
+ )
override fun newReader(): TableReader {
val reader = path.bufferedReader()
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
index 5f879a54..3f49c770 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
@@ -24,6 +24,8 @@ package org.opendc.trace.swf
import org.opendc.trace.*
import java.io.BufferedReader
+import java.time.Duration
+import java.time.Instant
/**
* A [TableReader] implementation for the SWF format.
@@ -85,10 +87,10 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
override fun <T> get(column: TableColumn<T>): T {
val res: Any = when (column) {
- TASK_ID -> getLong(TASK_ID)
- TASK_SUBMIT_TIME -> getLong(TASK_SUBMIT_TIME)
- TASK_WAIT_TIME -> getLong(TASK_WAIT_TIME)
- TASK_RUNTIME -> getLong(TASK_RUNTIME)
+ TASK_ID -> fields[COL_JOB_ID]
+ TASK_SUBMIT_TIME -> Instant.ofEpochSecond(fields[COL_SUBMIT_TIME].toLong(10))
+ TASK_WAIT_TIME -> Duration.ofSeconds(fields[COL_WAIT_TIME].toLong(10))
+ TASK_RUNTIME -> Duration.ofSeconds(fields[COL_RUN_TIME].toLong(10))
TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS)
TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS)
TASK_PARENTS -> {
@@ -121,13 +123,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
}
override fun getLong(column: TableColumn<Long>): Long {
- return when (column) {
- TASK_ID -> fields[COL_JOB_ID].toLong(10)
- TASK_SUBMIT_TIME -> fields[COL_SUBMIT_TIME].toLong(10)
- TASK_WAIT_TIME -> fields[COL_WAIT_TIME].toLong(10)
- TASK_RUNTIME -> fields[COL_RUN_TIME].toLong(10)
- else -> throw IllegalArgumentException("Invalid column")
- }
+ throw IllegalArgumentException("Invalid column")
}
override fun getDouble(column: TableColumn<Double>): Double {
diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
index 9686891b..828c2bfa 100644
--- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
@@ -85,10 +85,10 @@ internal class SwfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(1, reader.getLong(TASK_ID)) },
+ { assertEquals("1", reader.get(TASK_ID)) },
{ assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals(2, reader.getLong(TASK_ID)) },
+ { assertEquals("2", reader.get(TASK_ID)) },
{ assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) },
)
diff --git a/opendc-trace/opendc-trace-wfformat/build.gradle.kts b/opendc-trace/opendc-trace-wfformat/build.gradle.kts
new file mode 100644
index 00000000..2d336d03
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/build.gradle.kts
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Support for WfCommons workload traces in OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcTrace.opendcTraceApi)
+
+ implementation(libs.jackson.core)
+}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt
new file mode 100644
index 00000000..7b7f979f
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wfformat
+
+import com.fasterxml.jackson.core.JsonFactory
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * A [Table] containing the tasks in a WfCommons workload trace.
+ */
+internal class WfFormatTaskTable(private val factory: JsonFactory, private val path: Path) : Table {
+ override val name: String = TABLE_TASKS
+
+ override val isSynthetic: Boolean = false
+
+ override val columns: List<TableColumn<*>> = listOf(
+ TASK_ID,
+ TASK_WORKFLOW_ID,
+ TASK_RUNTIME,
+ TASK_REQ_NCPUS,
+ TASK_PARENTS,
+ TASK_CHILDREN
+ )
+
+ override fun newReader(): TableReader {
+ val parser = factory.createParser(path.toFile())
+ return WfFormatTaskTableReader(parser)
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Invalid partition $partition")
+ }
+
+ override fun toString(): String = "WfFormatTaskTable"
+}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
new file mode 100644
index 00000000..4408ba5c
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
@@ -0,0 +1,234 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wfformat
+
+import com.fasterxml.jackson.core.JsonParseException
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.core.JsonToken
+import org.opendc.trace.*
+import java.time.Duration
+import kotlin.math.roundToInt
+
+/**
+ * A [TableReader] implementation for the WfCommons workload trace format.
+ */
+internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableReader {
+ /**
+ * The current nesting of the parser.
+ */
+ private var level: ParserLevel = ParserLevel.TOP
+
+ override fun nextRow(): Boolean {
+ reset()
+
+ var hasJob = false
+
+ while (!hasJob) {
+ when (level) {
+ ParserLevel.TOP -> {
+ val token = parser.nextToken()
+
+ // Check whether the document is not empty and starts with an object
+ if (token == null) {
+ break
+ } else if (token != JsonToken.START_OBJECT) {
+ throw JsonParseException(parser, "Expected object", parser.currentLocation)
+ } else {
+ level = ParserLevel.TRACE
+ }
+ }
+ ParserLevel.TRACE -> {
+ // Seek for the workflow object in the file
+ if (!seekWorkflow()) {
+ break
+ } else if (!parser.isExpectedStartObjectToken) {
+ throw JsonParseException(parser, "Expected object", parser.currentLocation)
+ } else {
+ level = ParserLevel.WORKFLOW
+ }
+ }
+ ParserLevel.WORKFLOW -> {
+ // Seek for the jobs object in the file
+ level = if (!seekJobs()) {
+ ParserLevel.TRACE
+ } else if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array", parser.currentLocation)
+ } else {
+ ParserLevel.JOB
+ }
+ }
+ ParserLevel.JOB -> {
+ when (parser.nextToken()) {
+ JsonToken.END_ARRAY -> level = ParserLevel.WORKFLOW
+ JsonToken.START_OBJECT -> {
+ parseJob()
+ hasJob = true
+ break
+ }
+ else -> throw JsonParseException(parser, "Unexpected token", parser.currentLocation)
+ }
+ }
+ }
+ }
+
+ return hasJob
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ TASK_ID -> true
+ TASK_WORKFLOW_ID -> true
+ TASK_RUNTIME -> true
+ TASK_REQ_NCPUS -> true
+ TASK_PARENTS -> true
+ TASK_CHILDREN -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ TASK_ID -> id
+ TASK_WORKFLOW_ID -> workflowId
+ TASK_RUNTIME -> runtime
+ TASK_PARENTS -> parents
+ TASK_CHILDREN -> children
+ TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ return when (column) {
+ TASK_REQ_NCPUS -> cores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Parse the trace and seek until the workflow description.
+ */
+ private fun seekWorkflow(): Boolean {
+ while (parser.nextValue() != JsonToken.END_OBJECT) {
+ when (parser.currentName) {
+ "name" -> workflowId = parser.text
+ "workflow" -> return true
+ else -> parser.skipChildren()
+ }
+ }
+
+ return false
+ }
+
+ /**
+ * Parse the workflow description in the file and seek until the first job.
+ */
+ private fun seekJobs(): Boolean {
+ while (parser.nextValue() != JsonToken.END_OBJECT) {
+ when (parser.currentName) {
+ "jobs" -> return true
+ else -> parser.skipChildren()
+ }
+ }
+
+ return false
+ }
+
+ /**
+ * Parse a single job in the file.
+ */
+ private fun parseJob() {
+ while (parser.nextValue() != JsonToken.END_OBJECT) {
+ when (parser.currentName) {
+ "name" -> id = parser.text
+ "parents" -> parents = parseIds()
+ "children" -> children = parseIds()
+ "runtime" -> runtime = Duration.ofSeconds(parser.numberValue.toLong())
+ "cores" -> cores = parser.floatValue.roundToInt()
+ else -> parser.skipChildren()
+ }
+ }
+ }
+
+ /**
+ * Parse the parents/children of a job.
+ */
+ private fun parseIds(): Set<String> {
+ if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array", parser.currentLocation)
+ }
+
+ val ids = mutableSetOf<String>()
+
+ while (parser.nextToken() != JsonToken.END_ARRAY) {
+ if (parser.currentToken != JsonToken.VALUE_STRING) {
+ throw JsonParseException(parser, "Expected token", parser.currentLocation)
+ }
+
+ ids.add(parser.valueAsString)
+ }
+
+ return ids
+ }
+
+ private enum class ParserLevel {
+ TOP, TRACE, WORKFLOW, JOB
+ }
+
+ /**
+ * State fields for the parser.
+ */
+ private var id: String? = null
+ private var workflowId: String? = null
+ private var runtime: Duration? = null
+ private var parents: Set<String>? = null
+ private var children: Set<String>? = null
+ private var cores = -1
+
+ private fun reset() {
+ id = null
+ runtime = null
+ parents = null
+ children = null
+ cores = -1
+ }
+}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt
new file mode 100644
index 00000000..2d9c79fb
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wfformat
+
+import com.fasterxml.jackson.core.JsonFactory
+import org.opendc.trace.TABLE_TASKS
+import org.opendc.trace.Table
+import org.opendc.trace.Trace
+import java.nio.file.Path
+
+/**
+ * [Trace] implementation for the WfCommons workload trace format.
+ */
+public class WfFormatTrace internal constructor(private val factory: JsonFactory, private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_TASKS)
+
+ override fun containsTable(name: String): Boolean = TABLE_TASKS == name
+
+ override fun getTable(name: String): Table? {
+ return when (name) {
+ TABLE_TASKS -> WfFormatTaskTable(factory, path)
+ else -> null
+ }
+ }
+
+ override fun toString(): String = "WfFormatTrace[$path]"
+}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
new file mode 100644
index 00000000..ff8d054c
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wfformat
+
+import com.fasterxml.jackson.core.JsonFactory
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A [TraceFormat] implementation for the WfCommons workload trace format.
+ */
+public class WfFormatTraceFormat : TraceFormat {
+ /**
+ * The [JsonFactory] that is used to created JSON parsers.
+ */
+ private val factory = JsonFactory()
+
+ override val name: String = "wfformat"
+
+ override fun open(url: URL): WfFormatTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return WfFormatTrace(factory, path)
+ }
+}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
new file mode 100644
index 00000000..ee3aa2f6
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
@@ -0,0 +1 @@
+org.opendc.trace.wfformat.WfFormatTraceFormat
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
new file mode 100644
index 00000000..b07f27ed
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
@@ -0,0 +1,345 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wfformat
+
+import com.fasterxml.jackson.core.JsonFactory
+import com.fasterxml.jackson.core.JsonParseException
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.junit.jupiter.api.assertThrows
+import org.opendc.trace.TASK_ID
+import org.opendc.trace.TASK_PARENTS
+
+/**
+ * Test suite for the [WfFormatTaskTableReader] class.
+ */
+internal class WfFormatTaskTableReaderTest {
+ /**
+ * The [JsonFactory] used to construct the parser.
+ */
+ private val factory = JsonFactory()
+
+ @Test
+ fun testEmptyInput() {
+ val content = ""
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertFalse(reader.nextRow())
+ reader.close()
+ }
+
+ @Test
+ fun testTopLevelArrayInput() {
+ val content = "[]"
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> {
+ while (reader.nextRow()) {
+ continue
+ }
+ }
+
+ reader.close()
+ }
+
+ @Test
+ fun testNoWorkflow() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon"
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertDoesNotThrow {
+ while (reader.nextRow()) {
+ continue
+ }
+ }
+
+ reader.close()
+ }
+
+ @Test
+ fun testWorkflowArrayType() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": []
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> {
+ while (reader.nextRow()) {
+ continue
+ }
+ }
+
+ reader.close()
+ }
+
+ @Test
+ fun testWorkflowNullType() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": null
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> {
+ while (reader.nextRow()) {
+ continue
+ }
+ }
+
+ reader.close()
+ }
+
+ @Test
+ fun testNoJobs() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertDoesNotThrow { reader.nextRow() }
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsObjectType() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": { "jobs": {} }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> { reader.nextRow() }
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsNullType() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": { "jobs": null }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> { reader.nextRow() }
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsInvalidChildType() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [1]
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> { reader.nextRow() }
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsValidChildType() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test"
+ }
+ ]
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertTrue(reader.nextRow())
+ assertEquals("test", reader.get(TASK_ID))
+ assertFalse(reader.nextRow())
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsInvalidParents() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": 1,
+ }
+ ]
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> { reader.nextRow() }
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsInvalidParentsItem() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": [1],
+ }
+ ]
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> { reader.nextRow() }
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsValidParents() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": ["1"]
+ }
+ ]
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertTrue(reader.nextRow())
+ assertEquals(setOf("1"), reader.get(TASK_PARENTS))
+ assertFalse(reader.nextRow())
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsInvalidSecondEntry() {
+ val content = """
+ {
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": ["1"]
+ },
+ "test"
+ ]
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertDoesNotThrow { reader.nextRow() }
+ assertThrows<JsonParseException> { reader.nextRow() }
+
+ reader.close()
+ }
+
+ @Test
+ fun testDuplicateJobsArray() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": ["1"]
+ }
+ ],
+ "jobs": [
+ {
+ "name": "test2",
+ "parents": ["test"]
+ }
+ ]
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertTrue(reader.nextRow())
+ assertTrue(reader.nextRow())
+ assertEquals("test2", reader.get(TASK_ID))
+ assertFalse(reader.nextRow())
+
+ reader.close()
+ }
+}
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
new file mode 100644
index 00000000..0bfc8840
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wfformat
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.junit.jupiter.api.assertThrows
+import org.opendc.trace.*
+import java.io.File
+import java.net.URL
+
+/**
+ * Test suite for the [WfFormatTraceFormat] class.
+ */
+class WfFormatTraceFormatTest {
+ @Test
+ fun testTraceExists() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val format = WfFormatTraceFormat()
+ assertDoesNotThrow { format.open(input) }
+ }
+
+ @Test
+ fun testTraceDoesNotExists() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val format = WfFormatTraceFormat()
+ assertThrows<IllegalArgumentException> { format.open(URL(input.toString() + "help")) }
+ }
+
+ @Test
+ fun testTables() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val format = WfFormatTraceFormat()
+ val trace = format.open(input)
+
+ assertEquals(listOf(TABLE_TASKS), trace.tables)
+ }
+
+ @Test
+ fun testTableExists() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val format = WfFormatTraceFormat()
+ val table = format.open(input).getTable(TABLE_TASKS)
+
+ assertNotNull(table)
+ assertDoesNotThrow { table!!.newReader() }
+ }
+
+ @Test
+ fun testTableDoesNotExist() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val format = WfFormatTraceFormat()
+ val trace = format.open(input)
+
+ assertFalse(trace.containsTable("test"))
+ assertNull(trace.getTable("test"))
+ }
+
+ /**
+ * Smoke test for parsing WfCommons traces.
+ */
+ @Test
+ fun testTableReader() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val trace = WfFormatTraceFormat().open(input)
+ val reader = trace.getTable(TABLE_TASKS)!!.newReader()
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.get(TASK_ID)) },
+ { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) },
+ { assertEquals(172000, reader.get(TASK_RUNTIME).toMillis()) },
+ { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) },
+ )
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.get(TASK_ID)) },
+ { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) },
+ { assertEquals(175000, reader.get(TASK_RUNTIME).toMillis()) },
+ { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.get(TASK_PARENTS)) },
+ )
+
+ reader.close()
+ }
+
+ /**
+ * Test full iteration of the table.
+ */
+ @Test
+ fun testTableReaderFull() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val trace = WfFormatTraceFormat().open(input)
+ val reader = trace.getTable(TABLE_TASKS)!!.newReader()
+
+ assertDoesNotThrow {
+ while (reader.nextRow()) {
+ // reader.get(TASK_ID)
+ }
+ reader.close()
+ }
+ }
+
+ @Test
+ fun testTableReaderPartition() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val format = WfFormatTraceFormat()
+ val table = format.open(input).getTable(TABLE_TASKS)!!
+
+ assertThrows<IllegalArgumentException> { table.newReader("test") }
+ }
+}
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json b/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json
new file mode 100644
index 00000000..d21f024d
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json
@@ -0,0 +1,1342 @@
+{
+ "name": "eager-nextflow-chameleon",
+ "description": "Instance generated with WfCommons - https://wfcommons.org",
+ "createdAt": "2021-09-06T03:43:31.762479",
+ "schemaVersion": "1.2",
+ "author": {
+ "name": "cc",
+ "email": "support@wfcommons.org"
+ },
+ "wms": {
+ "name": "Nextflow",
+ "version": "21.04.3",
+ "url": "https://www.nextflow.io"
+ },
+ "workflow": {
+ "executedAt": "20210906T034331+0000",
+ "makespan": 275,
+ "jobs": [
+ {
+ "name": "makebwaindex_mammoth_mt_krause.fasta",
+ "type": "compute",
+ "runtime": 172.182,
+ "command": {
+ "program": "makebwaindex",
+ "arguments": [
+ "bwa",
+ "index",
+ "Mammoth_MT_Krause.fasta",
+ "mkdir",
+ "BWAIndex",
+ "&&",
+ "mv",
+ "Mammoth_MT_Krause.fasta*",
+ "BWAIndex"
+ ]
+ },
+ "parents": [],
+ "children": [
+ "makeseqdict_mammoth_mt_krause.fasta"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000001",
+ "category": "makebwaindex",
+ "avgCPU": 5.8,
+ "bytesRead": 124,
+ "bytesWritten": 126,
+ "memory": 4248
+ },
+ {
+ "name": "makeseqdict_mammoth_mt_krause.fasta",
+ "type": "compute",
+ "runtime": 175.427,
+ "command": {
+ "program": "makeseqdict",
+ "arguments": [
+ "picard",
+ "-Xmx6144M",
+ "CreateSequenceDictionary",
+ "R=Mammoth_MT_Krause.fasta",
+ "O=\"Mammoth_MT_Krause.dict\""
+ ]
+ },
+ "parents": [
+ "makebwaindex_mammoth_mt_krause.fasta"
+ ],
+ "children": [
+ "makefastaindex_mammoth_mt_krause.fasta"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000003",
+ "category": "makeseqdict",
+ "avgCPU": 83.5,
+ "bytesRead": 22728,
+ "bytesWritten": 1300,
+ "memory": 104416
+ },
+ {
+ "name": "makefastaindex_mammoth_mt_krause.fasta",
+ "type": "compute",
+ "runtime": 170.797,
+ "command": {
+ "program": "makefastaindex",
+ "arguments": [
+ "samtools",
+ "faidx",
+ "Mammoth_MT_Krause.fasta"
+ ]
+ },
+ "parents": [
+ "makeseqdict_mammoth_mt_krause.fasta"
+ ],
+ "children": [
+ "output_documentation"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000002",
+ "category": "makefastaindex",
+ "avgCPU": 23.8,
+ "bytesRead": 66,
+ "bytesWritten": 4,
+ "memory": 6096
+ },
+ {
+ "name": "output_documentation",
+ "type": "compute",
+ "runtime": 173.479,
+ "command": {
+ "program": "output_documentation",
+ "arguments": [
+ "markdown_to_html.py",
+ "output.md",
+ "-o",
+ "results_description.html"
+ ]
+ },
+ "parents": [
+ "makefastaindex_mammoth_mt_krause.fasta"
+ ],
+ "children": [
+ "get_software_versions"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000005",
+ "category": "output_documentation",
+ "avgCPU": 84.0,
+ "bytesRead": 8222,
+ "bytesWritten": 15165,
+ "memory": 11488
+ },
+ {
+ "name": "get_software_versions",
+ "type": "compute",
+ "runtime": 183.445,
+ "command": {
+ "program": "get_software_versions",
+ "arguments": [
+ "echo",
+ "2.3.5",
+ "&>",
+ "v_pipeline.txt",
+ "echo",
+ "21.04.3",
+ "&>",
+ "v_nextflow.txt",
+ "fastqc",
+ "--version",
+ "&>",
+ "v_fastqc.txt",
+ "2>&1",
+ "||",
+ "true",
+ "AdapterRemoval",
+ "--version",
+ "&>",
+ "v_adapterremoval.txt",
+ "2>&1",
+ "||",
+ "true",
+ "fastp",
+ "--version",
+ "&>",
+ "v_fastp.txt",
+ "2>&1",
+ "||",
+ "true",
+ "bwa",
+ "&>",
+ "v_bwa.txt",
+ "2>&1",
+ "||",
+ "true",
+ "circulargenerator",
+ "--help",
+ "|",
+ "head",
+ "-n",
+ "1",
+ "&>",
+ "v_circulargenerator.txt",
+ "2>&1",
+ "||",
+ "true",
+ "samtools",
+ "--version",
+ "&>",
+ "v_samtools.txt",
+ "2>&1",
+ "||",
+ "true",
+ "dedup",
+ "-v",
+ "&>",
+ "v_dedup.txt",
+ "2>&1",
+ "||",
+ "true",
+ "##",
+ "bioconda",
+ "recipe",
+ "of",
+ "picard",
+ "is",
+ "incorrectly",
+ "set",
+ "up",
+ "and",
+ "extra",
+ "warning",
+ "made",
+ "with",
+ "stderr,",
+ "this",
+ "ugly",
+ "command",
+ "ensures",
+ "only",
+ "version",
+ "exported",
+ "(",
+ "exec",
+ "7>&1",
+ "picard",
+ "MarkDuplicates",
+ "--version",
+ "2>&1",
+ ">&7",
+ "|",
+ "grep",
+ "-v",
+ "/",
+ ">&2",
+ ")",
+ "2>",
+ "v_markduplicates.txt",
+ "||",
+ "true",
+ "qualimap",
+ "--version",
+ "&>",
+ "v_qualimap.txt",
+ "2>&1",
+ "||",
+ "true",
+ "preseq",
+ "&>",
+ "v_preseq.txt",
+ "2>&1",
+ "||",
+ "true",
+ "gatk",
+ "--version",
+ "2>&1",
+ "|",
+ "head",
+ "-n",
+ "1",
+ ">",
+ "v_gatk.txt",
+ "2>&1",
+ "||",
+ "true",
+ "gatk3",
+ "--version",
+ "2>&1",
+ ">",
+ "v_gatk3.txt",
+ "2>&1",
+ "||",
+ "true",
+ "freebayes",
+ "--version",
+ "&>",
+ "v_freebayes.txt",
+ "2>&1",
+ "||",
+ "true",
+ "bedtools",
+ "--version",
+ "&>",
+ "v_bedtools.txt",
+ "2>&1",
+ "||",
+ "true",
+ "damageprofiler",
+ "--version",
+ "&>",
+ "v_damageprofiler.txt",
+ "2>&1",
+ "||",
+ "true",
+ "bam",
+ "--version",
+ "&>",
+ "v_bamutil.txt",
+ "2>&1",
+ "||",
+ "true",
+ "pmdtools",
+ "--version",
+ "&>",
+ "v_pmdtools.txt",
+ "2>&1",
+ "||",
+ "true",
+ "angsd",
+ "-h",
+ "|&",
+ "head",
+ "-n",
+ "1",
+ "|",
+ "cut",
+ "-d",
+ "-f3-4",
+ "&>",
+ "v_angsd.txt",
+ "2>&1",
+ "||",
+ "true",
+ "multivcfanalyzer",
+ "--help",
+ "|",
+ "head",
+ "-n",
+ "1",
+ "&>",
+ "v_multivcfanalyzer.txt",
+ "2>&1",
+ "||",
+ "true",
+ "malt-run",
+ "--help",
+ "|&",
+ "tail",
+ "-n",
+ "3",
+ "|",
+ "head",
+ "-n",
+ "1",
+ "|",
+ "cut",
+ "-f",
+ "2",
+ "-d(",
+ "|",
+ "cut",
+ "-f",
+ "1",
+ "-d",
+ ",",
+ "&>",
+ "v_malt.txt",
+ "2>&1",
+ "||",
+ "true",
+ "MaltExtract",
+ "--help",
+ "|",
+ "head",
+ "-n",
+ "2",
+ "|",
+ "tail",
+ "-n",
+ "1",
+ "&>",
+ "v_maltextract.txt",
+ "2>&1",
+ "||",
+ "true",
+ "multiqc",
+ "--version",
+ "&>",
+ "v_multiqc.txt",
+ "2>&1",
+ "||",
+ "true",
+ "vcf2genome",
+ "-h",
+ "|&",
+ "head",
+ "-n",
+ "1",
+ "&>",
+ "v_vcf2genome.txt",
+ "||",
+ "true",
+ "mtnucratio",
+ "--help",
+ "&>",
+ "v_mtnucratiocalculator.txt",
+ "||",
+ "true",
+ "sexdeterrmine",
+ "--version",
+ "&>",
+ "v_sexdeterrmine.txt",
+ "||",
+ "true",
+ "kraken2",
+ "--version",
+ "|",
+ "head",
+ "-n",
+ "1",
+ "&>",
+ "v_kraken.txt",
+ "||",
+ "true",
+ "endorS.py",
+ "--version",
+ "&>",
+ "v_endorSpy.txt",
+ "||",
+ "true",
+ "pileupCaller",
+ "--version",
+ "&>",
+ "v_sequencetools.txt",
+ "2>&1",
+ "||",
+ "true",
+ "bowtie2",
+ "--version",
+ "|",
+ "grep",
+ "-a",
+ "bowtie2-.*",
+ "-fdebug",
+ ">",
+ "v_bowtie2.txt",
+ "||",
+ "true",
+ "eigenstrat_snp_coverage",
+ "--version",
+ "|",
+ "cut",
+ "-d",
+ "-f2",
+ ">v_eigenstrat_snp_coverage.txt",
+ "||",
+ "true",
+ "mapDamage",
+ "--version",
+ ">",
+ "v_mapdamage.txt",
+ "||",
+ "true",
+ "bbduk.sh",
+ "|",
+ "grep",
+ "Last",
+ "modified",
+ "|",
+ "cut",
+ "-d",
+ "-f",
+ "3-99",
+ ">",
+ "v_bbduk.txt",
+ "||",
+ "true",
+ "scrape_software_versions.py",
+ "&>",
+ "software_versions_mqc.yaml"
+ ]
+ },
+ "parents": [
+ "output_documentation"
+ ],
+ "children": [
+ "fastqc_jk2782_l1",
+ "fastqc_jk2802_l2"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000006",
+ "category": "get_software_versions",
+ "avgCPU": 147.8,
+ "bytesRead": 172760,
+ "bytesWritten": 1048,
+ "memory": 387324
+ },
+ {
+ "name": "fastqc_jk2782_l1",
+ "type": "compute",
+ "runtime": 175.205,
+ "command": {
+ "program": "fastqc",
+ "arguments": [
+ "fastqc",
+ "-t",
+ "2",
+ "-q",
+ "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq.gz",
+ "JK2782_TGGCCGATCAACGA_L008_R2_001.fastq.gz.tengrand.fq.gz",
+ "rename",
+ "s/_fastqc.zip$/_raw_fastqc.zip/",
+ "*_fastqc.zip",
+ "rename",
+ "s/_fastqc.html$/_raw_fastqc.html/",
+ "*_fastqc.html"
+ ]
+ },
+ "parents": [
+ "get_software_versions"
+ ],
+ "children": [
+ "adapter_removal_jk2782_l1",
+ "adapter_removal_jk2802_l2"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000007",
+ "category": "fastqc",
+ "avgCPU": 161.8,
+ "bytesRead": 35981,
+ "bytesWritten": 3967,
+ "memory": 270124
+ },
+ {
+ "name": "adapter_removal_jk2782_l1",
+ "type": "compute",
+ "runtime": 172.643,
+ "command": {
+ "program": "adapter_removal",
+ "arguments": [
+ "mkdir",
+ "-p",
+ "output",
+ "AdapterRemoval",
+ "--file1",
+ "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq.gz",
+ "--file2",
+ "JK2782_TGGCCGATCAACGA_L008_R2_001.fastq.gz.tengrand.fq.gz",
+ "--basename",
+ "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe",
+ "--gzip",
+ "--threads",
+ "2",
+ "--qualitymax",
+ "41",
+ "--collapse",
+ "--trimns",
+ "--trimqualities",
+ "--adapter1",
+ "AGATCGGAAGAGCACACGTCTGAACTCCAGTCAC",
+ "--adapter2",
+ "AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGTA",
+ "--minlength",
+ "30",
+ "--minquality",
+ "20",
+ "--minadapteroverlap",
+ "1",
+ "cat",
+ "*.collapsed.gz",
+ "*.collapsed.truncated.gz",
+ "*.singleton.truncated.gz",
+ "*.pair1.truncated.gz",
+ "*.pair2.truncated.gz",
+ ">",
+ "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.tmp.fq.gz",
+ "mv",
+ "*.settings",
+ "output/",
+ "##",
+ "Add",
+ "R_",
+ "and",
+ "L_",
+ "for",
+ "unmerged",
+ "reads",
+ "for",
+ "DeDup",
+ "compatibility",
+ "AdapterRemovalFixPrefix",
+ "-Xmx4g",
+ "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.tmp.fq.gz",
+ "|",
+ "pigz",
+ "-p",
+ "1",
+ ">",
+ "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz"
+ ]
+ },
+ "parents": [
+ "fastqc_jk2782_l1",
+ "fastqc_jk2802_l2"
+ ],
+ "children": [
+ "fastqc_after_clipping_jk2782_l1",
+ "fastqc_after_clipping_jk2802_l2"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000008",
+ "category": "adapter_removal",
+ "avgCPU": 160.9,
+ "bytesRead": 17357,
+ "bytesWritten": 4405,
+ "memory": 79308
+ },
+ {
+ "name": "fastqc_jk2802_l2",
+ "type": "compute",
+ "runtime": 177.338,
+ "command": {
+ "program": "fastqc",
+ "arguments": [
+ "fastqc",
+ "-q",
+ "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq.gz",
+ "rename",
+ "s/_fastqc.zip$/_raw_fastqc.zip/",
+ "*_fastqc.zip",
+ "rename",
+ "s/_fastqc.html$/_raw_fastqc.html/",
+ "*_fastqc.html"
+ ]
+ },
+ "parents": [
+ "get_software_versions"
+ ],
+ "children": [
+ "adapter_removal_jk2782_l1",
+ "adapter_removal_jk2802_l2"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000009",
+ "category": "fastqc",
+ "avgCPU": 120.1,
+ "bytesRead": 24457,
+ "bytesWritten": 2181,
+ "memory": 181060
+ },
+ {
+ "name": "adapter_removal_jk2802_l2",
+ "type": "compute",
+ "runtime": 174.313,
+ "command": {
+ "program": "adapter_removal",
+ "arguments": [
+ "mkdir",
+ "-p",
+ "output",
+ "AdapterRemoval",
+ "--file1",
+ "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq.gz",
+ "--basename",
+ "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se",
+ "--gzip",
+ "--threads",
+ "2",
+ "--qualitymax",
+ "41",
+ "--trimns",
+ "--trimqualities",
+ "--adapter1",
+ "AGATCGGAAGAGCACACGTCTGAACTCCAGTCAC",
+ "--adapter2",
+ "AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGTA",
+ "--minlength",
+ "30",
+ "--minquality",
+ "20",
+ "--minadapteroverlap",
+ "1",
+ "mv",
+ "*.settings",
+ "*.se.truncated.gz",
+ "output/"
+ ]
+ },
+ "parents": [
+ "fastqc_jk2782_l1",
+ "fastqc_jk2802_l2"
+ ],
+ "children": [
+ "fastqc_after_clipping_jk2782_l1",
+ "fastqc_after_clipping_jk2802_l2"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000010",
+ "category": "adapter_removal",
+ "avgCPU": 106.5,
+ "bytesRead": 683,
+ "bytesWritten": 897,
+ "memory": 12136
+ },
+ {
+ "name": "fastqc_after_clipping_jk2782_l1",
+ "type": "compute",
+ "runtime": 15.371,
+ "command": {
+ "program": "fastqc_after_clipping",
+ "arguments": [
+ "fastqc",
+ "-t",
+ "2",
+ "-q",
+ "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz"
+ ]
+ },
+ "parents": [
+ "adapter_removal_jk2782_l1",
+ "adapter_removal_jk2802_l2"
+ ],
+ "children": [
+ "bwa_jk2802",
+ "bwa_jk2782"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000013",
+ "category": "fastqc_after_clipping",
+ "avgCPU": 133.3,
+ "bytesRead": 23788,
+ "bytesWritten": 1998,
+ "memory": 215020
+ },
+ {
+ "name": "fastqc_after_clipping_jk2802_l2",
+ "type": "compute",
+ "runtime": 15.272,
+ "command": {
+ "program": "fastqc_after_clipping",
+ "arguments": [
+ "fastqc",
+ "-t",
+ "2",
+ "-q",
+ "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz"
+ ]
+ },
+ "parents": [
+ "adapter_removal_jk2782_l1",
+ "adapter_removal_jk2802_l2"
+ ],
+ "children": [
+ "bwa_jk2802",
+ "bwa_jk2782"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000014",
+ "category": "fastqc_after_clipping",
+ "avgCPU": 124.1,
+ "bytesRead": 23882,
+ "bytesWritten": 2143,
+ "memory": 213064
+ },
+ {
+ "name": "bwa_jk2802",
+ "type": "compute",
+ "runtime": 9.566,
+ "command": {
+ "program": "bwa",
+ "arguments": [
+ "bwa",
+ "aln",
+ "-t",
+ "2",
+ "BWAIndex/Mammoth_MT_Krause.fasta",
+ "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz",
+ "-n",
+ "0.04",
+ "-l",
+ "1024",
+ "-k",
+ "2",
+ "-o",
+ "1",
+ "-f",
+ "JK2802.sai",
+ "bwa",
+ "samse",
+ "-r",
+ "\"@RGtID:ILLUMINA-JK2802tSM:JK2802tPL:illuminatPU:ILLUMINA-JK2802-SE\"",
+ "BWAIndex/Mammoth_MT_Krause.fasta",
+ "JK2802.sai",
+ "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz",
+ "|",
+ "samtools",
+ "sort",
+ "-@",
+ "1",
+ "-O",
+ "bam",
+ "-",
+ ">",
+ "\"JK2802\"_\"SE\".mapped.bam",
+ "samtools",
+ "index",
+ "\"JK2802\"_\"SE\".mapped.bam"
+ ]
+ },
+ "parents": [
+ "fastqc_after_clipping_jk2782_l1",
+ "fastqc_after_clipping_jk2802_l2"
+ ],
+ "children": [
+ "samtools_flagstat_jk2782",
+ "samtools_flagstat_jk2802"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000016",
+ "category": "bwa",
+ "avgCPU": 15.7,
+ "bytesRead": 3774,
+ "bytesWritten": 3367,
+ "memory": 10628
+ },
+ {
+ "name": "bwa_jk2782",
+ "type": "compute",
+ "runtime": 9.652,
+ "command": {
+ "program": "bwa",
+ "arguments": [
+ "bwa",
+ "aln",
+ "-t",
+ "2",
+ "BWAIndex/Mammoth_MT_Krause.fasta",
+ "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz",
+ "-n",
+ "0.04",
+ "-l",
+ "1024",
+ "-k",
+ "2",
+ "-o",
+ "1",
+ "-f",
+ "JK2782.sai",
+ "bwa",
+ "samse",
+ "-r",
+ "\"@RGtID:ILLUMINA-JK2782tSM:JK2782tPL:illuminatPU:ILLUMINA-JK2782-PE\"",
+ "BWAIndex/Mammoth_MT_Krause.fasta",
+ "JK2782.sai",
+ "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz",
+ "|",
+ "samtools",
+ "sort",
+ "-@",
+ "1",
+ "-O",
+ "bam",
+ "-",
+ ">",
+ "\"JK2782\"_\"PE\".mapped.bam",
+ "samtools",
+ "index",
+ "\"JK2782\"_\"PE\".mapped.bam"
+ ]
+ },
+ "parents": [
+ "fastqc_after_clipping_jk2782_l1",
+ "fastqc_after_clipping_jk2802_l2"
+ ],
+ "children": [
+ "samtools_flagstat_jk2782",
+ "samtools_flagstat_jk2802"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000015",
+ "category": "bwa",
+ "avgCPU": 69.8,
+ "bytesRead": 3705,
+ "bytesWritten": 3355,
+ "memory": 12876
+ },
+ {
+ "name": "samtools_flagstat_jk2782",
+ "type": "compute",
+ "runtime": 13.011,
+ "command": {
+ "program": "samtools_flagstat",
+ "arguments": [
+ "samtools",
+ "flagstat",
+ "JK2782_PE.mapped.bam",
+ ">",
+ "JK2782_flagstat.stats"
+ ]
+ },
+ "parents": [
+ "bwa_jk2802",
+ "bwa_jk2782"
+ ],
+ "children": [
+ "markduplicates_jk2782",
+ "markduplicates_jk2802"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000026",
+ "category": "samtools_flagstat",
+ "avgCPU": 30.1,
+ "bytesRead": 478,
+ "bytesWritten": 5,
+ "memory": 6468
+ },
+ {
+ "name": "samtools_flagstat_jk2802",
+ "type": "compute",
+ "runtime": 13.129,
+ "command": {
+ "program": "samtools_flagstat",
+ "arguments": [
+ "samtools",
+ "flagstat",
+ "JK2802_SE.mapped.bam",
+ ">",
+ "JK2802_flagstat.stats"
+ ]
+ },
+ "parents": [
+ "bwa_jk2802",
+ "bwa_jk2782"
+ ],
+ "children": [
+ "markduplicates_jk2782",
+ "markduplicates_jk2802"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000024",
+ "category": "samtools_flagstat",
+ "avgCPU": 118.5,
+ "bytesRead": 551,
+ "bytesWritten": 5
+ },
+ {
+ "name": "markduplicates_jk2782",
+ "type": "compute",
+ "runtime": 22.655,
+ "command": {
+ "program": "markduplicates",
+ "arguments": [
+ "mv",
+ "JK2782_PE.mapped.bam",
+ "JK2782.bam",
+ "picard",
+ "-Xmx4096M",
+ "MarkDuplicates",
+ "INPUT=JK2782.bam",
+ "OUTPUT=JK2782_rmdup.bam",
+ "REMOVE_DUPLICATES=TRUE",
+ "AS=TRUE",
+ "METRICS_FILE=\"JK2782_rmdup.metrics\"",
+ "VALIDATION_STRINGENCY=SILENT",
+ "samtools",
+ "index",
+ "JK2782_rmdup.bam"
+ ]
+ },
+ "parents": [
+ "samtools_flagstat_jk2782",
+ "samtools_flagstat_jk2802"
+ ],
+ "children": [
+ "preseq_jk2782",
+ "preseq_jk2802"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000021",
+ "category": "markduplicates",
+ "avgCPU": 173.6,
+ "bytesRead": 24055,
+ "bytesWritten": 2319,
+ "memory": 1400048
+ },
+ {
+ "name": "markduplicates_jk2802",
+ "type": "compute",
+ "runtime": 21.545,
+ "command": {
+ "program": "markduplicates",
+ "arguments": [
+ "mv",
+ "JK2802_SE.mapped.bam",
+ "JK2802.bam",
+ "picard",
+ "-Xmx4096M",
+ "MarkDuplicates",
+ "INPUT=JK2802.bam",
+ "OUTPUT=JK2802_rmdup.bam",
+ "REMOVE_DUPLICATES=TRUE",
+ "AS=TRUE",
+ "METRICS_FILE=\"JK2802_rmdup.metrics\"",
+ "VALIDATION_STRINGENCY=SILENT",
+ "samtools",
+ "index",
+ "JK2802_rmdup.bam"
+ ]
+ },
+ "parents": [
+ "samtools_flagstat_jk2782",
+ "samtools_flagstat_jk2802"
+ ],
+ "children": [
+ "preseq_jk2782",
+ "preseq_jk2802"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000020",
+ "category": "markduplicates",
+ "avgCPU": 182.6,
+ "bytesRead": 24242,
+ "bytesWritten": 2466,
+ "memory": 1404624
+ },
+ {
+ "name": "preseq_jk2782",
+ "type": "compute",
+ "runtime": 12.299,
+ "command": {
+ "program": "preseq",
+ "arguments": [
+ "preseq",
+ "c_curve",
+ "-s",
+ "1000",
+ "-o",
+ "JK2782_PE.mapped.ccurve",
+ "-B",
+ "JK2782_PE.mapped.bam"
+ ]
+ },
+ "parents": [
+ "markduplicates_jk2782",
+ "markduplicates_jk2802"
+ ],
+ "children": [
+ "endorspy_jk2782",
+ "endorspy_jk2802"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000030",
+ "category": "preseq",
+ "avgCPU": 81.9,
+ "bytesRead": 473,
+ "bytesWritten": 4,
+ "memory": 12032
+ },
+ {
+ "name": "preseq_jk2802",
+ "type": "compute",
+ "runtime": 10.188,
+ "command": {
+ "program": "preseq",
+ "arguments": [
+ "preseq",
+ "c_curve",
+ "-s",
+ "1000",
+ "-o",
+ "JK2802_SE.mapped.ccurve",
+ "-B",
+ "JK2802_SE.mapped.bam"
+ ]
+ },
+ "parents": [
+ "markduplicates_jk2782",
+ "markduplicates_jk2802"
+ ],
+ "children": [
+ "endorspy_jk2782",
+ "endorspy_jk2802"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000027",
+ "category": "preseq",
+ "avgCPU": 77.6,
+ "bytesRead": 548,
+ "bytesWritten": 4,
+ "memory": 11972
+ },
+ {
+ "name": "endorspy_jk2782",
+ "type": "compute",
+ "runtime": 7.537,
+ "command": {
+ "program": "endorspy",
+ "arguments": [
+ "endorS.py",
+ "-o",
+ "json",
+ "-n",
+ "JK2782",
+ "JK2782_flagstat.stats"
+ ]
+ },
+ "parents": [
+ "preseq_jk2782",
+ "preseq_jk2802"
+ ],
+ "children": [
+ "damageprofiler_jk2802",
+ "damageprofiler_jk2782"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000031",
+ "category": "endorspy",
+ "avgCPU": 44.7,
+ "bytesRead": 623,
+ "bytesWritten": 4,
+ "memory": 12264
+ },
+ {
+ "name": "endorspy_jk2802",
+ "type": "compute",
+ "runtime": 8.0,
+ "command": {
+ "program": "endorspy",
+ "arguments": [
+ "endorS.py",
+ "-o",
+ "json",
+ "-n",
+ "JK2802",
+ "JK2802_flagstat.stats"
+ ]
+ },
+ "parents": [
+ "preseq_jk2782",
+ "preseq_jk2802"
+ ],
+ "children": [
+ "damageprofiler_jk2802",
+ "damageprofiler_jk2782"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000032",
+ "category": "endorspy",
+ "avgCPU": 54.0,
+ "bytesRead": 623,
+ "bytesWritten": 4,
+ "memory": 12224
+ },
+ {
+ "name": "damageprofiler_jk2802",
+ "type": "compute",
+ "runtime": 18.596,
+ "command": {
+ "program": "damageprofiler",
+ "arguments": [
+ "damageprofiler",
+ "-Xmx4g",
+ "-i",
+ "JK2802_rmdup.bam",
+ "-r",
+ "Mammoth_MT_Krause.fasta",
+ "-l",
+ "100",
+ "-t",
+ "15",
+ "-o",
+ ".",
+ "-yaxis_damageplot",
+ "0.30"
+ ]
+ },
+ "parents": [
+ "endorspy_jk2782",
+ "endorspy_jk2802"
+ ],
+ "children": [
+ "qualimap_jk2802",
+ "qualimap_jk2782"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000033",
+ "category": "damageprofiler",
+ "avgCPU": 88.6,
+ "bytesRead": 25744,
+ "bytesWritten": 391,
+ "memory": 242940
+ },
+ {
+ "name": "damageprofiler_jk2782",
+ "type": "compute",
+ "runtime": 16.736,
+ "command": {
+ "program": "damageprofiler",
+ "arguments": [
+ "damageprofiler",
+ "-Xmx4g",
+ "-i",
+ "JK2782_rmdup.bam",
+ "-r",
+ "Mammoth_MT_Krause.fasta",
+ "-l",
+ "100",
+ "-t",
+ "15",
+ "-o",
+ ".",
+ "-yaxis_damageplot",
+ "0.30"
+ ]
+ },
+ "parents": [
+ "endorspy_jk2782",
+ "endorspy_jk2802"
+ ],
+ "children": [
+ "qualimap_jk2802",
+ "qualimap_jk2782"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000036",
+ "category": "damageprofiler",
+ "avgCPU": 88.3,
+ "bytesRead": 25661,
+ "bytesWritten": 327,
+ "memory": 198276
+ },
+ {
+ "name": "qualimap_jk2802",
+ "type": "compute",
+ "runtime": 15.368,
+ "command": {
+ "program": "qualimap",
+ "arguments": [
+ "qualimap",
+ "bamqc",
+ "-bam",
+ "JK2802_rmdup.bam",
+ "-nt",
+ "2",
+ "-outdir",
+ ".",
+ "-outformat",
+ "\"HTML\"",
+ "--java-mem-size=4G"
+ ]
+ },
+ "parents": [
+ "damageprofiler_jk2802",
+ "damageprofiler_jk2782"
+ ],
+ "children": [
+ "multiqc_1"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000053",
+ "category": "qualimap",
+ "avgCPU": 177.7,
+ "bytesRead": 35038,
+ "bytesWritten": 1712,
+ "memory": 209440
+ },
+ {
+ "name": "qualimap_jk2782",
+ "type": "compute",
+ "runtime": 14.223,
+ "command": {
+ "program": "qualimap",
+ "arguments": [
+ "qualimap",
+ "bamqc",
+ "-bam",
+ "JK2782_rmdup.bam",
+ "-nt",
+ "2",
+ "-outdir",
+ ".",
+ "-outformat",
+ "\"HTML\"",
+ "--java-mem-size=4G"
+ ]
+ },
+ "parents": [
+ "damageprofiler_jk2802",
+ "damageprofiler_jk2782"
+ ],
+ "children": [
+ "multiqc_1"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000054",
+ "category": "qualimap",
+ "avgCPU": 181.9,
+ "bytesRead": 34954,
+ "bytesWritten": 1937,
+ "memory": 232196
+ },
+ {
+ "name": "multiqc_1",
+ "type": "compute",
+ "runtime": 46.376,
+ "command": {
+ "program": "multiqc",
+ "arguments": [
+ "multiqc",
+ "-f",
+ "multiqc_config.yaml",
+ "."
+ ]
+ },
+ "parents": [
+ "qualimap_jk2802",
+ "qualimap_jk2782"
+ ],
+ "children": [],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000056",
+ "category": "multiqc",
+ "avgCPU": 93.0,
+ "bytesRead": 1215169,
+ "bytesWritten": 22599,
+ "memory": 139496
+ }
+ ]
+ }
+}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt
index be26f540..74202718 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt
@@ -35,21 +35,18 @@ internal class WtfTaskTable(private val path: Path) : Table {
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_ID -> true
- TASK_WORKFLOW_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_WAIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_PARENTS -> true
- TASK_CHILDREN -> true
- TASK_GROUP_ID -> true
- TASK_USER_ID -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ TASK_ID,
+ TASK_WORKFLOW_ID,
+ TASK_SUBMIT_TIME,
+ TASK_WAIT_TIME,
+ TASK_RUNTIME,
+ TASK_REQ_NCPUS,
+ TASK_PARENTS,
+ TASK_CHILDREN,
+ TASK_GROUP_ID,
+ TASK_USER_ID
+ )
override fun newReader(): TableReader {
val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0"))
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index b6789542..5e2463f8 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -25,6 +25,8 @@ package org.opendc.trace.wtf
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
/**
* A [TableReader] implementation for the WTF format.
@@ -61,14 +63,14 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
@Suppress("UNCHECKED_CAST")
val res: Any = when (column) {
- TASK_ID -> record["id"]
- TASK_WORKFLOW_ID -> record["workflow_id"]
- TASK_SUBMIT_TIME -> record["ts_submit"]
- TASK_WAIT_TIME -> record["wait_time"]
- TASK_RUNTIME -> record["runtime"]
+ TASK_ID -> (record["id"] as Long).toString()
+ TASK_WORKFLOW_ID -> (record["workflow_id"] as Long).toString()
+ TASK_SUBMIT_TIME -> Instant.ofEpochMilli(record["ts_submit"] as Long)
+ TASK_WAIT_TIME -> Duration.ofMillis(record["wait_time"] as Long)
+ TASK_RUNTIME -> Duration.ofMillis(record["runtime"] as Long)
TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt()
- TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet()
- TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet()
+ TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
+ TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
TASK_GROUP_ID -> record["group_id"]
TASK_USER_ID -> record["user_id"]
else -> throw IllegalArgumentException("Invalid column")
@@ -94,16 +96,7 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
}
override fun getLong(column: TableColumn<Long>): Long {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (column) {
- TASK_ID -> record["id"] as Long
- TASK_WORKFLOW_ID -> record["workflow_id"] as Long
- TASK_SUBMIT_TIME -> record["ts_submit"] as Long
- TASK_WAIT_TIME -> record["wait_time"] as Long
- TASK_RUNTIME -> record["runtime"] as Long
- else -> throw IllegalArgumentException("Invalid column")
- }
+ throw IllegalArgumentException("Invalid column")
}
override fun getDouble(column: TableColumn<Double>): Double {
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt
index 7eff0f5a..a755a107 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt
@@ -43,5 +43,5 @@ public class WtfTrace internal constructor(private val path: Path) : Trace {
return WtfTaskTable(path)
}
- override fun toString(): String = "SwfTrace[$path]"
+ override fun toString(): String = "WtfTrace[$path]"
}
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
index a05a523e..b155f265 100644
--- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -28,6 +28,8 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.trace.*
import java.io.File
import java.net.URL
+import java.time.Duration
+import java.time.Instant
/**
* Test suite for the [WtfTraceFormat] class.
@@ -91,20 +93,20 @@ class WtfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(362334516345962206, reader.getLong(TASK_ID)) },
- { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) },
- { assertEquals(245604, reader.getLong(TASK_SUBMIT_TIME)) },
- { assertEquals(8163, reader.getLong(TASK_RUNTIME)) },
- { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) },
+ { assertEquals("362334516345962206", reader.get(TASK_ID)) },
+ { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) },
+ { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) },
+ { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) },
)
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(502010169100446658, reader.getLong(TASK_ID)) },
- { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) },
- { assertEquals(251325, reader.getLong(TASK_SUBMIT_TIME)) },
- { assertEquals(8216, reader.getLong(TASK_RUNTIME)) },
- { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) },
+ { assertEquals("502010169100446658", reader.get(TASK_ID)) },
+ { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) },
+ { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) },
+ { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) },
)
reader.close()
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt
index a390fe08..9ee3736e 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt
@@ -81,17 +81,17 @@ internal class TraceReplayer(private val trace: Trace) {
try {
while (reader.nextRow()) {
// Bag of tasks without workflow ID all share the same workflow
- val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.getLong(TASK_WORKFLOW_ID) else 0L
+ val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L
val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) }
- val id = reader.getLong(TASK_ID)
+ val id = reader.get(TASK_ID).toLong()
val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS))
reader.getInt(TASK_ALLOC_NCPUS)
else
reader.getInt(TASK_REQ_NCPUS)
- val submitTime = reader.getLong(TASK_SUBMIT_TIME)
- val runtime = reader.getLong(TASK_RUNTIME)
- val flops: Long = 4000 * runtime * grantedCpus
+ val submitTime = reader.get(TASK_SUBMIT_TIME)
+ val runtime = reader.get(TASK_RUNTIME)
+ val flops: Long = 4000 * runtime.seconds * grantedCpus
val workload = SimFlopsWorkload(flops)
val task = Task(
UUID(0L, id),
@@ -100,14 +100,14 @@ internal class TraceReplayer(private val trace: Trace) {
mapOf(
"workload" to workload,
WORKFLOW_TASK_CORES to grantedCpus,
- WORKFLOW_TASK_DEADLINE to (runtime * 1000)
+ WORKFLOW_TASK_DEADLINE to runtime.toMillis()
),
)
tasks[id] = task
- taskDependencies[task] = reader.get(TASK_PARENTS)
+ taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet()
- (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime) { a, b -> min(a as Long, b as Long) }
+ (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) }
(workflow.tasks as MutableSet<Task>).add(task)
}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 427cdb52..60e67e2b 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -49,6 +49,7 @@ include(":opendc-trace:opendc-trace-api")
include(":opendc-trace:opendc-trace-gwf")
include(":opendc-trace:opendc-trace-swf")
include(":opendc-trace:opendc-trace-wtf")
+include(":opendc-trace:opendc-trace-wfformat")
include(":opendc-trace:opendc-trace-bitbrains")
include(":opendc-trace:opendc-trace-parquet")
include(":opendc-harness:opendc-harness-api")