summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-02 11:50:43 +0200
committerGitHub <noreply@github.com>2021-09-02 11:50:43 +0200
commit05f80bd9fb7caf765e3ebbb70d48d0d5e185bd42 (patch)
tree5fa9501621ad327028c2f2e12c9c367f44f6aebe
parent99f391d11db57c3db3f326958de8f66502969cdb (diff)
parent5935531137a22fdb920921580d491f86adec65c9 (diff)
merge: Add generic trace reading library
This pull request adds a generic trace reading library to OpenDC. The library has been designed to support a wide range of trace formats and uses a streaming approach to improve performance of reading large traces. * Add trace reading API * Implement API for GWF format * Implement API for SWF format * Implement API for WTF format * Implement API for Bitbrains format * Implement API for Bitbrains Parquet format **Breaking API Changes** * `opendc-format` has been removed in favour of `opendc-trace-*`
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts6
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt (renamed from opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt)4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt38
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt64
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt6
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt263
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt (renamed from opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt)2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt (renamed from opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt)4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt56
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt103
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt56
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt103
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt49
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt47
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt140
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt230
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt (renamed from opendc-format/src/test/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReaderTest.kt)30
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt47
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt4
-rw-r--r--opendc-experiments/opendc-experiments-energy21/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-tf20/build.gradle.kts5
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt3
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt17
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt (renamed from opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt)5
-rw-r--r--opendc-format/build.gradle.kts26
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt66
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt89
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt100
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt127
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt176
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt176
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt126
-rw-r--r--opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt45
-rw-r--r--opendc-trace/build.gradle.kts21
-rw-r--r--opendc-trace/opendc-trace-api/build.gradle.kts32
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt56
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt129
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt53
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt68
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt59
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt70
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt44
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt98
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt43
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt62
-rw-r--r--opendc-trace/opendc-trace-bitbrains/build.gradle.kts36
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt139
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt218
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt46
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt56
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat1
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt100
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/resources/bitbrains.csv (renamed from opendc-format/src/test/resources/bitbrains.csv)0
-rw-r--r--opendc-trace/opendc-trace-gwf/build.gradle.kts37
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt59
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt211
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt46
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt56
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat1
-rw-r--r--opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt109
-rw-r--r--opendc-trace/opendc-trace-gwf/src/test/resources/trace.gwf71
-rw-r--r--opendc-trace/opendc-trace-parquet/build.gradle.kts60
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt (renamed from opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt)2
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt (renamed from opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt)2
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt (renamed from opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt)2
-rw-r--r--opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt (renamed from opendc-format/src/test/kotlin/org/opendc/format/util/ParquetTest.kt)2
-rw-r--r--opendc-trace/opendc-trace-swf/build.gradle.kts35
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt63
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt162
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt46
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt (renamed from opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt)32
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat1
-rw-r--r--opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt107
-rw-r--r--opendc-trace/opendc-trace-swf/src/test/resources/trace.swf (renamed from opendc-format/src/test/resources/swf_trace.txt)0
-rw-r--r--opendc-trace/opendc-trace-wtf/build.gradle.kts37
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt64
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt116
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt47
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt41
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat1
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt121
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet (renamed from opendc-format/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet)bin87475 -> 87475 bytes
-rw-r--r--opendc-web/opendc-web-runner/build.gradle.kts1
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt4
-rw-r--r--opendc-workflow/opendc-workflow-service/build.gradle.kts6
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt127
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt (renamed from opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt)75
-rw-r--r--settings.gradle.kts7
93 files changed, 4039 insertions, 1268 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 53643aba..65cebe1b 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -31,7 +31,8 @@ plugins {
dependencies {
api(platform(projects.opendcPlatform))
api(projects.opendcHarness.opendcHarnessApi)
- implementation(projects.opendcFormat)
+ implementation(projects.opendcTrace.opendcTraceParquet)
+ implementation(projects.opendcTrace.opendcTraceBitbrains)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
implementation(projects.opendcSimulator.opendcSimulatorFailures)
@@ -43,6 +44,9 @@ dependencies {
implementation(libs.config)
implementation(libs.progressbar)
implementation(libs.clikt)
+ implementation(libs.jackson.module.kotlin) {
+ exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect")
+ }
implementation(libs.parquet)
testImplementation(libs.log4j.slf4j)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 7f428b2a..46e11056 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -45,10 +45,10 @@ import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
import org.opendc.compute.service.scheduler.weights.RamWeigher
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.simulator.SimHost
+import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.trace.TraceReader
+import org.opendc.experiments.capelin.trace.TraceReader
import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.power.SimplePowerDriver
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
index d73d14f5..babd8ada 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
@@ -22,8 +22,6 @@
package org.opendc.experiments.capelin.env
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt
index 97d6f239..a968b043 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.format.environment
+package org.opendc.experiments.capelin.env
import java.io.Closeable
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt
new file mode 100644
index 00000000..b0c0318f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.env
+
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.power.PowerModel
+import java.util.*
+
+/**
+ * A definition of a machine in a cluster.
+ */
+public data class MachineDef(
+ val uid: UUID,
+ val name: String,
+ val meta: Map<String, Any>,
+ val model: MachineModel,
+ val powerModel: PowerModel
+)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
index d8f7ff75..897a6692 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
@@ -28,7 +28,7 @@ import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.experiments.capelin.telemetry.Event
-import org.opendc.format.util.LocalOutputFile
+import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.Closeable
import java.io.File
import java.util.concurrent.ArrayBlockingQueue
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
index 0f49ecd2..0bf4ada6 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
@@ -24,8 +24,6 @@ package org.opendc.experiments.capelin.trace
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimWorkload
/**
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
index 16ad6816..fa4e9ed8 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
@@ -22,12 +22,10 @@
package org.opendc.experiments.capelin.trace
-import org.apache.avro.generic.GenericData
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.format.util.LocalParquetReader
+import org.opendc.experiments.capelin.trace.bp.BPTraceFormat
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.trace.*
import java.io.File
import java.util.UUID
@@ -38,26 +36,29 @@ import java.util.UUID
*/
class RawParquetTraceReader(private val path: File) {
/**
+ * The [Trace] that represents this trace.
+ */
+ private val trace = BPTraceFormat().open(path.toURI().toURL())
+
+ /**
* Read the fragments into memory.
*/
- private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> {
- val reader = LocalParquetReader<GenericData.Record>(File(path, "trace.parquet"))
+ private fun parseFragments(): Map<String, List<SimTraceWorkload.Fragment>> {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
return try {
- while (true) {
- val record = reader.read() ?: break
-
- val id = record["id"].toString()
- val time = record["time"] as Long
- val duration = record["duration"] as Long
- val cores = record["cores"] as Int
- val cpuUsage = record["cpuUsage"] as Double
+ while (reader.nextRow()) {
+ val id = reader.get(RESOURCE_STATE_ID)
+ val time = reader.get(RESOURCE_STATE_TIMESTAMP)
+ val duration = reader.get(RESOURCE_STATE_DURATION)
+ val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
val fragment = SimTraceWorkload.Fragment(
- time,
- duration,
+ time.toEpochMilli(),
+ duration.toMillis(),
cpuUsage,
cores
)
@@ -74,25 +75,24 @@ class RawParquetTraceReader(private val path: File) {
/**
* Read the metadata into a workload.
*/
- private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
- val metaReader = LocalParquetReader<GenericData.Record>(File(path, "meta.parquet"))
+ private fun parseMeta(fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
var counter = 0
val entries = mutableListOf<TraceEntry<SimWorkload>>()
return try {
- while (true) {
- val record = metaReader.read() ?: break
+ while (reader.nextRow()) {
- val id = record["id"].toString()
+ val id = reader.get(RESOURCE_ID)
if (!fragments.containsKey(id)) {
continue
}
- val submissionTime = record["submissionTime"] as Long
- val endTime = record["endTime"] as Long
- val maxCores = record["maxCores"] as Int
- val requiredMemory = record["requiredMemory"] as Long
+ val submissionTime = reader.get(RESOURCE_START_TIME)
+ val endTime = reader.get(RESOURCE_END_TIME)
+ val maxCores = reader.getInt(RESOURCE_NCPUS)
+ val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY)
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
val vmFragments = fragments.getValue(id).asSequence()
@@ -100,13 +100,13 @@ class RawParquetTraceReader(private val path: File) {
val workload = SimTraceWorkload(vmFragments)
entries.add(
TraceEntry(
- uid, id, submissionTime, workload,
+ uid, id, submissionTime.toEpochMilli(), workload,
mapOf(
- "submit-time" to submissionTime,
- "end-time" to endTime,
+ "submit-time" to submissionTime.toEpochMilli(),
+ "end-time" to endTime.toEpochMilli(),
"total-load" to totalLoad,
"cores" to maxCores,
- "required-memory" to requiredMemory,
+ "required-memory" to requiredMemory.toLong(),
"workload" to workload
)
)
@@ -118,7 +118,7 @@ class RawParquetTraceReader(private val path: File) {
e.printStackTrace()
throw e
} finally {
- metaReader.close()
+ reader.close()
}
}
@@ -128,8 +128,8 @@ class RawParquetTraceReader(private val path: File) {
private val entries: List<TraceEntry<SimWorkload>>
init {
- val fragments = parseFragments(path)
- entries = parseMeta(path, fragments)
+ val fragments = parseFragments()
+ entries = parseMeta(fragments)
}
/**
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
index 35f4c5b8..61e4cab5 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
@@ -30,11 +30,9 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.filter2.predicate.Statistics
import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.format.util.LocalInputFile
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.trace.util.parquet.LocalInputFile
import java.io.File
import java.io.Serializable
import java.util.SortedSet
@@ -187,7 +185,7 @@ class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = e
assert(uid !in takenIds)
takenIds += uid
- logger.info("Processing VM $id")
+ logger.info { "Processing VM $id" }
val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
index d7daa35b..a021de8d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
@@ -36,9 +36,11 @@ import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.format.trace.bitbrains.BitbrainsTraceReader
-import org.opendc.format.util.LocalOutputFile
-import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.experiments.capelin.trace.sv.SvTraceFormat
+import org.opendc.trace.*
+import org.opendc.trace.bitbrains.BitbrainsTraceFormat
+import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
@@ -156,220 +158,101 @@ sealed class TraceConversion(name: String) : OptionGroup(name) {
): MutableList<Fragment>
}
-class SolvinityConversion : TraceConversion("Solvinity") {
- private val clusters by option()
- .split(",")
-
- private val vmPlacements by option("--vm-placements", help = "file containing the VM placements")
- .file(canBeDir = false)
- .convert { VmPlacementReader(it.inputStream()).use { reader -> reader.read() } }
- .required()
+/**
+ * A [TraceConversion] that uses the Trace API to perform the conversion.
+ */
+abstract class AbstractConversion(name: String) : TraceConversion(name) {
+ abstract val format: TraceFormat
override fun read(
traceDirectory: File,
metaSchema: Schema,
metaWriter: ParquetWriter<GenericData.Record>
): MutableList<Fragment> {
- val clusters = clusters?.toSet() ?: emptySet()
- val timestampCol = 0
- val cpuUsageCol = 1
- val coreCol = 12
- val provisionedMemoryCol = 20
- val traceInterval = 5 * 60 * 1000L
-
- // Identify start time of the entire trace
- var minTimestamp = Long.MAX_VALUE
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach file@{ vmFile ->
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val vmId = vmFile.name
+ val fragments = mutableListOf<Fragment>()
+ val trace = format.open(traceDirectory.toURI().toURL())
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null || !clusters.contains(clusterName)) {
- continue
- }
+ var lastId: String? = null
+ var maxCores = Int.MIN_VALUE
+ var requiredMemory = Long.MIN_VALUE
+ var minTime = Long.MAX_VALUE
+ var maxTime = Long.MIN_VALUE
+ var lastTimestamp = Long.MIN_VALUE
- val values = line.split("\t")
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+ while (reader.nextRow()) {
+ val id = reader.get(RESOURCE_STATE_ID)
- if (timestamp < minTimestamp) {
- minTimestamp = timestamp
- }
- return@file
- }
- }
- }
+ if (lastId != null && lastId != id) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", lastId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
}
+ lastId = id
- println("Start of trace at $minTimestamp")
+ val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP)
+ val timestampMs = timestamp.toEpochMilli()
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
+ val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)
- val allFragments = mutableListOf<Fragment>()
+ maxCores = max(maxCores, cores)
+ requiredMemory = max(requiredMemory, (memCapacity / 1000).toLong())
- val begin = 15 * 24 * 60 * 60 * 1000L
- val end = 45 * 24 * 60 * 60 * 1000L
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach { vmFile ->
- println(vmFile)
-
- var vmId = ""
- var maxCores = -1
- var requiredMemory = -1L
- var cores: Int
- var minTime = Long.MAX_VALUE
-
- val flopsFragments = sequence {
- var last: Fragment? = null
-
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split("\t")
-
- vmId = vmFile.name
-
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null || !clusters.contains(clusterName)) {
- continue
- }
-
- val timestamp =
- (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp
- if (begin > timestamp || timestamp > end) {
- continue
- }
-
- cores = values[coreCol].trim().toInt()
- requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
- maxCores = max(maxCores, cores)
- minTime = min(minTime, timestamp)
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
- requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
- maxCores = max(maxCores, cores)
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- last = if (last != null && last!!.flops == 0L && flops == 0L) {
- val oldFragment = last!!
- Fragment(
- vmId,
- oldFragment.tick,
- oldFragment.flops + flops,
- oldFragment.duration + traceInterval,
- cpuUsage,
- cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- cores
- )
- if (last != null) {
- yield(last!!)
- }
- fragment
- }
- }
- }
- }
-
- if (last != null) {
- yield(last!!)
- }
- }
-
- var maxTime = Long.MIN_VALUE
- flopsFragments.filter { it.tick in begin until end }.forEach { fragment ->
- allFragments.add(fragment)
- maxTime = max(maxTime, fragment.tick)
- }
-
- if (minTime in begin until end) {
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", vmId)
- metaRecord.put("submissionTime", minTime)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
- metaWriter.write(metaRecord)
- }
+ if (lastTimestamp < 0) {
+ lastTimestamp = timestampMs - 5 * 60 * 1000L
+ minTime = min(minTime, lastTimestamp)
}
- return allFragments
- }
-}
-
-/**
- * Conversion of the Bitbrains public trace.
- */
-class BitbrainsConversion : TraceConversion("Bitbrains") {
- override fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment> {
- val fragments = mutableListOf<Fragment>()
- BitbrainsTraceReader(traceDirectory).use { reader ->
- reader.forEach { entry ->
- val trace = (entry.workload as SimTraceWorkload).trace
- var maxTime = Long.MIN_VALUE
- trace.forEach { fragment ->
- val flops: Long = (fragment.usage * fragment.duration / 1000).toLong()
+ if (fragments.isEmpty()) {
+ val duration = 5 * 60 * 1000L
+ val flops: Long = (cpuUsage * duration / 1000).toLong()
+ fragments.add(Fragment(id, lastTimestamp, flops, duration, cpuUsage, cores))
+ } else {
+ val last = fragments.last()
+ val duration = timestampMs - lastTimestamp
+ val flops: Long = (cpuUsage * duration / 1000).toLong()
+
+ // Perform run-length encoding
+ if (last.id == id && (duration == 0L || last.usage == cpuUsage)) {
+ fragments[fragments.size - 1] = last.copy(duration = last.duration + duration)
+ } else {
fragments.add(
Fragment(
- entry.name,
- fragment.timestamp,
+ id,
+ lastTimestamp,
flops,
- fragment.duration,
- fragment.usage,
- fragment.cores
+ duration,
+ cpuUsage,
+ cores
)
)
- maxTime = max(maxTime, fragment.timestamp + fragment.duration)
}
-
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", entry.name)
- metaRecord.put("submissionTime", entry.start)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", entry.meta["cores"])
- metaRecord.put("requiredMemory", entry.meta["required-memory"])
- metaWriter.write(metaRecord)
}
+
+ val last = fragments.last()
+ maxTime = max(maxTime, last.tick + last.duration)
+ lastTimestamp = timestampMs
}
return fragments
}
}
+class SolvinityConversion : AbstractConversion("Solvinity") {
+ override val format: TraceFormat = SvTraceFormat()
+}
+
+/**
+ * Conversion of the Bitbrains public trace.
+ */
+class BitbrainsConversion : AbstractConversion("Bitbrains") {
+ override val format: TraceFormat = BitbrainsTraceFormat()
+}
+
/**
* Conversion of the Azure public VM trace.
*/
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt
index 3ce79d69..303a6a8c 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package org.opendc.format.trace
+package org.opendc.experiments.capelin.trace
import java.util.UUID
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt
index 797a88d5..08304edc 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.format.trace
+package org.opendc.experiments.capelin.trace
/**
* An interface for reading workloads into memory.
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
index 6de3f265..cb32ce88 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
@@ -26,7 +26,6 @@ import mu.KotlinLogging
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.SamplingStrategy
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.format.trace.TraceEntry
import org.opendc.simulator.compute.workload.SimWorkload
import java.util.*
import kotlin.random.Random
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
new file mode 100644
index 00000000..35bfd5ef
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.nio.file.Path
+
+/**
+ * The resource state [Table] in the Bitbrains Parquet format.
+ */
+internal class BPResourceStateTable(private val path: Path) : Table {
+ override val name: String = TABLE_RESOURCE_STATES
+ override val isSynthetic: Boolean = false
+
+ override fun isSupported(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_DURATION -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ else -> false
+ }
+ }
+
+ override fun newReader(): TableReader {
+ val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
+ return BPResourceStateTableReader(reader)
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Unknown partition $partition")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt
new file mode 100644
index 00000000..0e7ee555
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A [TableReader] implementation for the Bitbrains Parquet format.
+ */
+internal class BPResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+ /**
+ * The current record.
+ */
+ private var record: GenericRecord? = null
+
+ override fun nextRow(): Boolean {
+ record = reader.read()
+ return record != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_DURATION -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ @Suppress("UNCHECKED_CAST")
+ val res: Any = when (column) {
+ RESOURCE_STATE_ID -> record["id"].toString()
+ RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long)
+ RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long)
+ RESOURCE_STATE_NCPUS -> record["cores"]
+ RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (column) {
+ RESOURCE_STATE_NCPUS -> record["cores"] as Int
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (column) {
+ RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ override fun toString(): String = "BPResourceStateTableReader"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
new file mode 100644
index 00000000..74d1e574
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.nio.file.Path
+
+/**
+ * The resource [Table] in the Bitbrains Parquet format.
+ */
+internal class BPResourceTable(private val path: Path) : Table {
+ override val name: String = TABLE_RESOURCES
+ override val isSynthetic: Boolean = false
+
+ override fun isSupported(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_ID -> true
+ RESOURCE_START_TIME -> true
+ RESOURCE_END_TIME -> true
+ RESOURCE_NCPUS -> true
+ RESOURCE_MEM_CAPACITY -> true
+ else -> false
+ }
+ }
+
+ override fun newReader(): TableReader {
+ val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
+ return BPResourceTableReader(reader)
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Unknown partition $partition")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
new file mode 100644
index 00000000..0a105783
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Instant
+
+/**
+ * A [TableReader] implementation for the Bitbrains Parquet format.
+ */
+internal class BPResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+ /**
+ * The current record.
+ */
+ private var record: GenericRecord? = null
+
+ override fun nextRow(): Boolean {
+ record = reader.read()
+ return record != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_ID -> true
+ RESOURCE_START_TIME -> true
+ RESOURCE_END_TIME -> true
+ RESOURCE_NCPUS -> true
+ RESOURCE_MEM_CAPACITY -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ @Suppress("UNCHECKED_CAST")
+ val res: Any = when (column) {
+ RESOURCE_ID -> record["id"].toString()
+ RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long)
+ RESOURCE_END_TIME -> Instant.ofEpochMilli(record["endTime"] as Long)
+ RESOURCE_NCPUS -> record["maxCores"]
+ RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (column) {
+ RESOURCE_NCPUS -> record["maxCores"] as Int
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (column) {
+ RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ override fun toString(): String = "BPResourceTableReader"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt
new file mode 100644
index 00000000..486587b1
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.opendc.trace.TABLE_RESOURCES
+import org.opendc.trace.TABLE_RESOURCE_STATES
+import org.opendc.trace.Table
+import org.opendc.trace.Trace
+import java.nio.file.Path
+
+/**
+ * A [Trace] in the Bitbrains Parquet format.
+ */
+public class BPTrace internal constructor(private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
+
+ override fun containsTable(name: String): Boolean =
+ name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES
+
+ override fun getTable(name: String): Table? {
+ return when (name) {
+ TABLE_RESOURCES -> BPResourceTable(path)
+ TABLE_RESOURCE_STATES -> BPResourceStateTable(path)
+ else -> null
+ }
+ }
+
+ override fun toString(): String = "BPTrace[$path]"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt
new file mode 100644
index 00000000..49d5b4c5
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A format implementation for the GWF trace format.
+ */
+public class BPTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "bitbrains-parquet"
+
+ /**
+ * Open a Bitbrains Parquet trace.
+ */
+ override fun open(url: URL): BPTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return BPTrace(path)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
new file mode 100644
index 00000000..71c9d52e
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.sv
+
+import org.opendc.trace.*
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.bufferedReader
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
+
+/**
+ * The resource state [Table] in the Bitbrains format.
+ */
+internal class SvResourceStateTable(path: Path) : Table {
+ /**
+ * The partitions that belong to the table.
+ */
+ private val partitions = Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "txt" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+
+ override val name: String = TABLE_RESOURCE_STATES
+
+ override val isSynthetic: Boolean = false
+
+ override fun isSupported(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_CLUSTER_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_CAPACITY -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ RESOURCE_STATE_CPU_USAGE_PCT -> true
+ RESOURCE_STATE_CPU_DEMAND -> true
+ RESOURCE_STATE_CPU_READY_PCT -> true
+ RESOURCE_STATE_MEM_CAPACITY -> true
+ RESOURCE_STATE_DISK_READ -> true
+ RESOURCE_STATE_DISK_WRITE -> true
+ else -> false
+ }
+ }
+
+ override fun newReader(): TableReader {
+ val it = partitions.iterator()
+
+ return object : TableReader {
+ var delegate: TableReader? = nextDelegate()
+
+ override fun nextRow(): Boolean {
+ var delegate = delegate
+
+ while (delegate != null) {
+ if (delegate.nextRow()) {
+ break
+ }
+
+ delegate.close()
+ delegate = nextDelegate()
+ }
+
+ this.delegate = delegate
+ return delegate != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.get(column)
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getBoolean(column)
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getInt(column)
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getLong(column)
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getDouble(column)
+ }
+
+ override fun close() {
+ delegate?.close()
+ }
+
+ private fun nextDelegate(): TableReader? {
+ return if (it.hasNext()) {
+ val (partition, path) = it.next()
+ val reader = path.bufferedReader()
+ return SvResourceStateTableReader(partition, reader)
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "BitbrainsCompositeTableReader"
+ }
+ }
+
+ override fun newReader(partition: String): TableReader {
+ val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
+ val reader = path.bufferedReader()
+ return SvResourceStateTableReader(partition, reader)
+ }
+
+ override fun toString(): String = "SvResourceStateTable"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
new file mode 100644
index 00000000..adcdb2ea
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
@@ -0,0 +1,230 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.sv
+
+import org.opendc.trace.*
+import java.io.BufferedReader
+import java.time.Instant
+
+/**
+ * A [TableReader] for the Bitbrains resource state table.
+ */
+internal class SvResourceStateTableReader(partition: String, private val reader: BufferedReader) : TableReader {
+ /**
+ * The current parser state.
+ */
+ private val state = RowState()
+
+ override fun nextRow(): Boolean {
+ state.reset()
+
+ var line: String
+ var num = 0
+
+ while (true) {
+ line = reader.readLine() ?: return false
+ num++
+
+ if (line[0] == '#' || line.isBlank()) {
+ // Ignore empty lines or comments
+ continue
+ }
+
+ break
+ }
+
+ line = line.trim()
+
+ val length = line.length
+ var col = 0
+ var start = 0
+ var end = 0
+
+ while (end < length) {
+ // Trim all whitespace before the field
+ start = end
+ while (start < length && line[start].isWhitespace()) {
+ start++
+ }
+
+ end = line.indexOf(' ', start)
+
+ if (end < 0) {
+ break
+ }
+
+ val field = line.subSequence(start, end) as String
+ when (col++) {
+ COL_TIMESTAMP -> state.timestamp = Instant.ofEpochSecond(field.toLong(10))
+ COL_CPU_USAGE -> state.cpuUsage = field.toDouble()
+ COL_CPU_DEMAND -> state.cpuDemand = field.toDouble()
+ COL_DISK_READ -> state.diskRead = field.toDouble()
+ COL_DISK_WRITE -> state.diskWrite = field.toDouble()
+ COL_CLUSTER_ID -> state.cluster = field.trim()
+ COL_NCPUS -> state.cpuCores = field.toInt(10)
+ COL_CPU_READY_PCT -> state.cpuReadyPct = field.toDouble()
+ COL_POWERED_ON -> state.poweredOn = field.toInt(10) == 1
+ COL_CPU_CAPACITY -> state.cpuCapacity = field.toDouble()
+ COL_ID -> state.id = field.trim()
+ COL_MEM_CAPACITY -> state.memCapacity = field.toDouble()
+ }
+ }
+
+ return true
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_CLUSTER_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_CAPACITY -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ RESOURCE_STATE_CPU_USAGE_PCT -> true
+ RESOURCE_STATE_CPU_DEMAND -> true
+ RESOURCE_STATE_CPU_READY_PCT -> true
+ RESOURCE_STATE_MEM_CAPACITY -> true
+ RESOURCE_STATE_DISK_READ -> true
+ RESOURCE_STATE_DISK_WRITE -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ RESOURCE_STATE_ID -> state.id
+ RESOURCE_STATE_CLUSTER_ID -> state.cluster
+ RESOURCE_STATE_TIMESTAMP -> state.timestamp
+ RESOURCE_STATE_NCPUS -> state.cpuCores
+ RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity
+ RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
+ RESOURCE_STATE_DISK_READ -> state.diskRead
+ RESOURCE_STATE_DISK_WRITE -> state.diskWrite
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_POWERED_ON -> state.poweredOn
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ return when (column) {
+ RESOURCE_STATE_NCPUS -> state.cpuCores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ return when (column) {
+ RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity
+ RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
+ RESOURCE_STATE_DISK_READ -> state.diskRead
+ RESOURCE_STATE_DISK_WRITE -> state.diskWrite
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ /**
+ * The current row state.
+ */
+ private class RowState {
+ @JvmField
+ var id: String? = null
+ @JvmField
+ var cluster: String? = null
+ @JvmField
+ var timestamp: Instant? = null
+ @JvmField
+ var cpuCores = -1
+ @JvmField
+ var cpuCapacity = Double.NaN
+ @JvmField
+ var cpuUsage = Double.NaN
+ @JvmField
+ var cpuDemand = Double.NaN
+ @JvmField
+ var cpuReadyPct = Double.NaN
+ @JvmField
+ var memCapacity = Double.NaN
+ @JvmField
+ var diskRead = Double.NaN
+ @JvmField
+ var diskWrite = Double.NaN
+ @JvmField
+ var poweredOn: Boolean = false
+
+ /**
+ * Reset the state.
+ */
+ fun reset() {
+ id = null
+ timestamp = null
+ cluster = null
+ cpuCores = -1
+ cpuCapacity = Double.NaN
+ cpuUsage = Double.NaN
+ cpuDemand = Double.NaN
+ cpuReadyPct = Double.NaN
+ memCapacity = Double.NaN
+ diskRead = Double.NaN
+ diskWrite = Double.NaN
+ poweredOn = false
+ }
+ }
+
+ /**
+ * Default column indices for the extended Bitbrains format.
+ */
+ private val COL_TIMESTAMP = 0
+ private val COL_CPU_USAGE = 1
+ private val COL_CPU_DEMAND = 2
+ private val COL_DISK_READ = 4
+ private val COL_DISK_WRITE = 6
+ private val COL_CLUSTER_ID = 10
+ private val COL_NCPUS = 12
+ private val COL_CPU_READY_PCT = 13
+ private val COL_POWERED_ON = 14
+ private val COL_CPU_CAPACITY = 18
+ private val COL_ID = 19
+ private val COL_MEM_CAPACITY = 20
+}
diff --git a/opendc-format/src/test/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt
index 48b4a2e3..dbd63de5 100644
--- a/opendc-format/src/test/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReaderTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt
@@ -20,26 +20,26 @@
* SOFTWARE.
*/
-package org.opendc.format.trace.bitbrains
+package org.opendc.experiments.capelin.trace.sv
-import org.junit.jupiter.api.Assertions.assertAll
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
+import org.opendc.trace.*
+import java.nio.file.Path
/**
- * Test suite for the [BitbrainsTraceReader] class.
+ * [Trace] implementation for the extended Bitbrains format.
*/
-class BitbrainsTraceReaderTest {
- @Test
- fun testSmoke() {
- val file = BitbrainsTraceReaderTest::class.java.getResourceAsStream("/bitbrains.csv")!!
- BitbrainsRawTraceReader(file).use { reader ->
- val entry = reader.next()
+public class SvTrace internal constructor(private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_RESOURCE_STATES)
- assertAll(
- { assertEquals(1376314846, entry.timestamp) },
- { assertEquals(19.066, entry.cpuUsage, 0.01) }
- )
+ override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name
+
+ override fun getTable(name: String): Table? {
+ if (!containsTable(name)) {
+ return null
}
+
+ return SvResourceStateTable(path)
}
+
+ override fun toString(): String = "SvTrace[$path]"
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt
new file mode 100644
index 00000000..0cce8559
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.sv
+
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A format implementation for the extended Bitbrains trace format.
+ */
+public class SvTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "sv"
+
+ /**
+ * Open the trace file.
+ */
+ override fun open(url: URL): SvTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return SvTrace(path)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index e4d3fed3..2934bbe6 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -36,13 +36,13 @@ import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
+import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.trace.TraceReader
+import org.opendc.experiments.capelin.trace.TraceReader
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
index bc05f09b..7d34d098 100644
--- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
@@ -31,7 +31,6 @@ plugins {
dependencies {
api(platform(projects.opendcPlatform))
api(projects.opendcHarness.opendcHarnessApi)
- implementation(projects.opendcFormat)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
implementation(projects.opendcSimulator.opendcSimulatorFailures)
diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
index b088045b..882c4894 100644
--- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
@@ -34,8 +34,11 @@ dependencies {
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
- implementation(projects.opendcFormat)
implementation(projects.opendcUtils)
implementation(libs.kotlin.logging)
+ implementation(libs.jackson.module.kotlin) {
+ exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect")
+ }
+ implementation("org.jetbrains.kotlin:kotlin-reflect:1.5.30")
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt
index 9a48aced..2153a862 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt
@@ -55,7 +55,8 @@ public class TensorFlowExperiment : Experiment(name = "tf20") {
.build()
val meter = meterProvider.get("opendc-tf20")
- val def = MLEnvironmentReader(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile)).read().first()
+ val envInput = checkNotNull(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile))
+ val def = MLEnvironmentReader().readEnvironment(envInput).first()
val device = SimTFDevice(
def.uid, def.meta["gpu"] as Boolean, coroutineContext, clock, meter, def.model.cpus[0],
def.model.memory[0], LinearPowerModel(250.0, 60.0)
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt
index 3e61f508..3cdf28fd 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt
@@ -25,8 +25,6 @@ package org.opendc.experiments.tf20.util
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -36,13 +34,16 @@ import java.io.InputStream
import java.util.*
/**
- * An [EnvironmentReader] for the TensorFlow experiments.
+ * An environment reader for the TensorFlow experiments.
*/
-public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader {
+public class MLEnvironmentReader {
+ /**
+ * The [ObjectMapper] to convert the format.
+ */
+ private val mapper = jacksonObjectMapper()
- private val setup: Setup = mapper.readValue(input)
-
- override fun read(): List<MachineDef> {
+ public fun readEnvironment(input: InputStream): List<MachineDef> {
+ val setup: Setup = mapper.readValue(input)
var counter = 0
return setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
@@ -109,6 +110,4 @@ public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jack
}
}
}
-
- override fun close() {}
}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt
index f65c4880..271f5923 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt
@@ -20,12 +20,15 @@
* SOFTWARE.
*/
-package org.opendc.format.environment
+package org.opendc.experiments.tf20.util
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.power.PowerModel
import java.util.*
+/**
+ * A definition of a machine in a cluster.
+ */
public data class MachineDef(
val uid: UUID,
val name: String,
diff --git a/opendc-format/build.gradle.kts b/opendc-format/build.gradle.kts
index c8e30846..0c7f2a51 100644
--- a/opendc-format/build.gradle.kts
+++ b/opendc-format/build.gradle.kts
@@ -39,31 +39,9 @@ dependencies {
exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect")
}
implementation(libs.jackson.dataformat.csv)
- implementation(kotlin("reflect"))
+ implementation("org.jetbrains.kotlin:kotlin-reflect:1.5.30")
- /* This configuration is necessary for a slim dependency on Apache Parquet */
- implementation(libs.parquet) {
- exclude(group = "org.apache.hadoop")
- }
- runtimeOnly(libs.hadoop.common) {
- exclude(group = "org.slf4j", module = "slf4j-log4j12")
- exclude(group = "log4j")
- exclude(group = "org.apache.hadoop")
- exclude(group = "org.apache.curator")
- exclude(group = "org.apache.zookeeper")
- exclude(group = "org.apache.kerby")
- exclude(group = "org.apache.httpcomponents")
- exclude(group = "org.apache.htrace")
- exclude(group = "commons-cli")
- exclude(group = "javax.servlet")
- exclude(group = "org.eclipse.jetty")
- exclude(group = "com.sun.jersey")
- exclude(group = "com.jcraft")
- exclude(group = "dnsjava")
- }
- runtimeOnly(libs.hadoop.mapreduce.client.core) {
- isTransitive = false
- }
+ implementation(projects.opendcTrace.opendcTraceParquet)
testRuntimeOnly(libs.slf4j.simple)
}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt
deleted file mode 100644
index c313467f..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.environment.sc18
-
-import com.fasterxml.jackson.annotation.JsonSubTypes
-import com.fasterxml.jackson.annotation.JsonTypeInfo
-
-/**
- * A topology setup.
- *
- * @property name The name of the setup.
- * @property rooms The rooms in the topology.
- */
-internal data class Setup(val name: String, val rooms: List<Room>)
-
-/**
- * A room in a topology.
- *
- * @property type The type of room in the topology.
- * @property objects The objects in the room.
- */
-internal data class Room(val type: String, val objects: List<RoomObject>)
-
-/**
- * An object in a [Room].
- *
- * @property type The type of the room object.
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
-@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)])
-internal sealed class RoomObject(val type: String) {
- /**
- * A rack in a server room.
- *
- * @property machines The machines in the rack.
- */
- internal data class Rack(val machines: List<Machine>) : RoomObject("RACK")
-}
-
-/**
- * A machine in the setup that consists of the specified CPU's represented as
- * integer identifiers and ethernet speed.
- *
- * @property cpus The CPUs in the machine represented as integer identifiers.
- */
-internal data class Machine(val cpus: List<Int>)
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
deleted file mode 100644
index 7780a98e..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.environment.sc18
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import com.fasterxml.jackson.module.kotlin.readValue
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.MachineDef
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.power.ConstantPowerModel
-import java.io.InputStream
-import java.util.*
-
-/**
- * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Topology
- * Schedulers".
- *
- * @param input The input stream to read from.
- * @param mapper The Jackson object mapper to use.
- */
-public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader {
- /**
- * The environment that was read from the file.
- */
- private val setup: Setup = mapper.readValue(input)
-
- /**
- * Read the environment.
- */
- public override fun read(): List<MachineDef> {
- var counter = 0
- return setup.rooms.flatMap { room ->
- room.objects.flatMap { roomObject ->
- when (roomObject) {
- is RoomObject.Rack -> {
- roomObject.machines.map { machine ->
- val cores = machine.cpus.flatMap { id ->
- when (id) {
- 1 -> {
- val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
- List(node.coreCount) { ProcessingUnit(node, it, 4100.0) }
- }
- 2 -> {
- val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
- List(node.coreCount) { ProcessingUnit(node, it, 3500.0) }
- }
- else -> throw IllegalArgumentException("The cpu id $id is not recognized")
- }
- }
- MachineDef(
- UUID(0L, counter++.toLong()),
- "node-$counter",
- emptyMap(),
- MachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))),
- ConstantPowerModel(0.0)
- )
- }
- }
- }
- }
- }
- }
-
- override fun close() {}
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt
deleted file mode 100644
index ff6cdd02..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.bitbrains
-
-import com.fasterxml.jackson.annotation.JsonProperty
-import com.fasterxml.jackson.databind.MappingIterator
-import com.fasterxml.jackson.dataformat.csv.CsvMapper
-import com.fasterxml.jackson.dataformat.csv.CsvSchema
-import java.io.InputStream
-
-/**
- * A trace reader that enables the user to read Bitbrains specific trace data.
- */
-public class BitbrainsRawTraceReader(input: InputStream) : Iterator<BitbrainsRawTraceReader.Entry>, AutoCloseable {
- /**
- * The [CsvSchema] that is used to parse the trace.
- */
- private val schema = CsvSchema.builder()
- .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .setAllowComments(true)
- .setUseHeader(true)
- .setColumnSeparator(';')
- .build()
-
- /**
- * The mapping iterator to use.
- */
- private val iterator: MappingIterator<Entry> = CsvMapper().readerFor(Entry::class.java).with(schema)
- .readValues(input)
-
- override fun hasNext(): Boolean {
- return iterator.hasNext()
- }
-
- override fun next(): Entry {
- return iterator.next()
- }
-
- override fun close() {
- iterator.close()
- }
-
- /**
- * A single entry in the trace.
- */
- public data class Entry(
- @JsonProperty("Timestamp [ms]")
- val timestamp: Long,
- @JsonProperty("CPU cores")
- val cpuCores: Int,
- @JsonProperty("CPU capacity provisioned [MHZ]")
- val cpuCapacity: Double,
- @JsonProperty("CPU usage [MHZ]")
- val cpuUsage: Double,
- @JsonProperty("CPU usage [%]")
- val cpuUsagePct: Double,
- @JsonProperty("Memory capacity provisioned [KB]")
- val memCapacity: Double,
- @JsonProperty("Memory usage [KB]")
- val memUsage: Double,
- @JsonProperty("Disk read throughput [KB/s]")
- val diskRead: Double,
- @JsonProperty("Disk write throughput [KB/s]")
- val diskWrite: Double,
- @JsonProperty("Network received throughput [KB/s]")
- val netReceived: Double,
- @JsonProperty("Network transmitted throughput [KB/s]")
- val netTransmitted: Double
- )
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
deleted file mode 100644
index 9e4876df..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.bitbrains
-
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.io.File
-import java.io.FileInputStream
-import java.util.*
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * A [TraceReader] for the public VM workload trace format.
- *
- * @param traceDirectory The directory of the traces.
- */
-public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkload> {
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>>
-
- /**
- * Initialize the reader.
- */
- init {
- val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>()
- val traceInterval = 5 * 60 * 1000L
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" }
- .forEach { vmFile ->
- val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>()
- var vmId = -1L
- var maxCores = Int.MIN_VALUE
- var requiredMemory = Long.MIN_VALUE
- var startTime = Long.MAX_VALUE
- var lastTimestamp = Long.MIN_VALUE
-
- BitbrainsRawTraceReader(FileInputStream(vmFile)).use { reader ->
- reader.forEach { entry ->
- val timestamp = entry.timestamp * 1000L
- val cpuUsage = entry.cpuUsage
- vmId = vmFile.nameWithoutExtension.trim().toLong()
- val cores = entry.cpuCores
- maxCores = max(maxCores, cores)
- requiredMemory = max(requiredMemory, (entry.memCapacity / 1000).toLong())
-
- if (lastTimestamp < 0) {
- lastTimestamp = timestamp - 5 * 60 * 1000L
- startTime = min(startTime, lastTimestamp)
- }
-
- if (flopsHistory.isEmpty()) {
- flopsHistory.add(SimTraceWorkload.Fragment(lastTimestamp, traceInterval, cpuUsage, cores))
- } else {
- val last = flopsHistory.last()
- val duration = timestamp - lastTimestamp
- // Perform run-length encoding
- if (duration == 0L || last.usage == cpuUsage) {
- flopsHistory[flopsHistory.size - 1] = last.copy(duration = last.duration + duration)
- } else {
- flopsHistory.add(
- SimTraceWorkload.Fragment(
- lastTimestamp,
- duration,
- cpuUsage,
- cores
- )
- )
- }
- }
-
- lastTimestamp = timestamp
- }
- }
-
- val uuid = UUID(0L, vmId)
-
- val workload = SimTraceWorkload(flopsHistory.asSequence())
- entries[vmId] = TraceEntry(
- uuid,
- vmId.toString(),
- startTime,
- workload,
- mapOf(
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
- )
- }
-
- // Create the entry iterator
- iterator = entries.values.sortedBy { it.start }.iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
deleted file mode 100644
index e68afeb7..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.gwf
-
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.workflow.api.Job
-import org.opendc.workflow.api.Task
-import org.opendc.workflow.api.WORKFLOW_TASK_CORES
-import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
-import java.io.BufferedReader
-import java.io.File
-import java.io.InputStream
-import java.util.*
-import kotlin.collections.HashSet
-import kotlin.collections.Iterator
-import kotlin.collections.List
-import kotlin.collections.MutableSet
-import kotlin.collections.component1
-import kotlin.collections.component2
-import kotlin.collections.filter
-import kotlin.collections.forEach
-import kotlin.collections.getOrPut
-import kotlin.collections.map
-import kotlin.collections.mapIndexed
-import kotlin.collections.mapOf
-import kotlin.collections.mutableMapOf
-import kotlin.collections.set
-import kotlin.collections.sortedBy
-import kotlin.collections.toMap
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * A [TraceReader] for the Grid Workload Format. See the Grid Workloads Archive (http://gwa.ewi.tudelft.nl/) for more
- * information about the format.
- *
- * Be aware that in the Grid Workload Format, workflows are not required to be ordered by submission time and therefore
- * this reader needs to read the whole trace into memory before an entry can be read. Consider converting the trace to a
- * different format for better performance.
- *
- * @param reader The buffered reader to read the trace with.
- */
-public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<Job>>
-
- /**
- * Create a [GwfTraceReader] instance from the specified [File].
- *
- * @param file The file to read from.
- */
- public constructor(file: File) : this(file.bufferedReader())
-
- /**
- * Create a [GwfTraceReader] instance from the specified [InputStream].
- *
- * @param input The input stream to read from.
- */
- public constructor(input: InputStream) : this(input.bufferedReader())
-
- /**
- * Initialize the reader.
- */
- init {
- val workflows = mutableMapOf<Long, Job>()
- val starts = mutableMapOf<Long, Long>()
- val tasks = mutableMapOf<Long, Task>()
- val taskDependencies = mutableMapOf<Task, List<Long>>()
-
- var workflowIdCol = 0
- var taskIdCol = 0
- var submitTimeCol = 0
- var runtimeCol = 0
- var coreCol = 0
- var dependencyCol = 0
-
- try {
- reader.lineSequence()
- .filter { line ->
- // Ignore comments in the trace
- !line.startsWith("#") && line.isNotBlank()
- }
- .forEachIndexed { idx, line ->
- val values = line.split(",")
-
- // Parse GWF header
- if (idx == 0) {
- val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
- workflowIdCol = header["WorkflowID"]!!
- taskIdCol = header["JobID"]!!
- submitTimeCol = header["SubmitTime"]!!
- runtimeCol = header["RunTime"]!!
- coreCol = header["NProcs"]!!
- dependencyCol = header["Dependencies"]!!
- return@forEachIndexed
- }
-
- val workflowId = values[workflowIdCol].trim().toLong()
- val taskId = values[taskIdCol].trim().toLong()
- val submitTime = values[submitTimeCol].trim().toLong() * 1000 // ms
- val runtime = max(0, values[runtimeCol].trim().toLong()) // s
- val cores = values[coreCol].trim().toInt()
- val dependencies = values[dependencyCol].split(" ")
- .filter { it.isNotEmpty() }
- .map { it.trim().toLong() }
-
- val flops: Long = 4000 * runtime * cores
-
- val workflow = workflows.getOrPut(workflowId) {
- Job(UUID(0L, workflowId), "<unnamed>", HashSet())
- }
- val workload = SimFlopsWorkload(flops)
- val task = Task(
- UUID(0L, taskId),
- "<unnamed>",
- HashSet(),
- mapOf(
- "workload" to workload,
- WORKFLOW_TASK_CORES to cores,
- WORKFLOW_TASK_DEADLINE to (runtime * 1000)
- ),
- )
- starts.merge(workflowId, submitTime, ::min)
- (workflow.tasks as MutableSet<Task>).add(task)
- tasks[taskId] = task
- taskDependencies[task] = dependencies
- }
- } finally {
- reader.close()
- }
-
- // Fix dependencies and dependents for all tasks
- taskDependencies.forEach { (task, dependencies) ->
- (task.dependencies as MutableSet<Task>).addAll(
- dependencies.map { taskId ->
- tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found")
- }
- )
- }
-
- // Create the entry iterator
- iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) }
- .sortedBy { it.start }
- .iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<Job> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
deleted file mode 100644
index bda392a9..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.swf
-
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.io.BufferedReader
-import java.io.File
-import java.io.FileReader
-import java.util.*
-
-/**
- * A [TraceReader] for reading SWF traces into VM-modeled workloads.
- *
- * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html
- *
- * @param file The trace file.
- */
-public class SwfTraceReader(
- file: File,
- maxNumCores: Int = -1
-) : TraceReader<SimWorkload> {
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>>
-
- /**
- * Initialize the reader.
- */
- init {
- val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>()
-
- val jobNumberCol = 0
- val submitTimeCol = 1 // seconds (begin of trace is 0)
- val waitTimeCol = 2 // seconds
- val runTimeCol = 3 // seconds
- val numAllocatedCoresCol = 4 // We assume that single-core processors were used at the time
- val requestedMemoryCol = 9 // KB per processor/core (-1 if not specified)
-
- val sliceDuration = 5 * 60L
-
- var jobNumber: Long
- var submitTime: Long
- var waitTime: Long
- var runTime: Long
- var cores: Int
- var memory: Long
- var slicedWaitTime: Long
- var runtimePartialSliceRemainder: Long
-
- BufferedReader(FileReader(file)).use { reader ->
- reader.lineSequence()
- .filter { line ->
- // Ignore comments in the trace
- !line.startsWith(";") && line.isNotBlank()
- }
- .forEach { line ->
- val values = line.trim().split("\\s+".toRegex())
-
- jobNumber = values[jobNumberCol].trim().toLong()
- submitTime = values[submitTimeCol].trim().toLong()
- waitTime = values[waitTimeCol].trim().toLong()
- runTime = values[runTimeCol].trim().toLong()
- cores = values[numAllocatedCoresCol].trim().toInt()
- memory = values[requestedMemoryCol].trim().toLong()
-
- if (maxNumCores != -1 && cores > maxNumCores) {
- println("Skipped a task due to processor count ($cores > $maxNumCores).")
- return@forEach
- }
-
- if (memory == -1L) {
- memory = 1000L * cores // assume 1GB of memory per processor if not specified
- } else {
- memory /= 1000 // convert KB to MB
- }
-
- val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>()
-
- // Insert waiting time slices
-
- // We ignore wait time remainders under one
- slicedWaitTime = 0L
- if (waitTime >= sliceDuration) {
- for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) {
- flopsHistory.add(
- SimTraceWorkload.Fragment(
- tick,
- sliceDuration * 1000,
- 0.0,
- cores
- )
- )
- slicedWaitTime += sliceDuration
- }
- }
-
- // Insert run time slices
-
- runtimePartialSliceRemainder = runTime % sliceDuration
-
- for (
- tick in (submitTime + slicedWaitTime)
- until (submitTime + slicedWaitTime + runTime - sliceDuration)
- step sliceDuration
- ) {
- flopsHistory.add(
- SimTraceWorkload.Fragment(
- tick,
- sliceDuration * 1000L,
- 1.0,
- cores
- )
- )
- }
-
- if (runtimePartialSliceRemainder > 0) {
- flopsHistory.add(
- SimTraceWorkload.Fragment(
- submitTime + slicedWaitTime + runTime,
- sliceDuration,
- runtimePartialSliceRemainder / sliceDuration.toDouble(),
- cores
- )
- )
- }
-
- val uuid = UUID(0L, jobNumber)
- val workload = SimTraceWorkload(flopsHistory.asSequence())
- entries[jobNumber] = TraceEntry(
- uuid,
- jobNumber.toString(),
- submitTime,
- workload,
- mapOf(
- "cores" to cores,
- "required-memory" to memory,
- "workload" to workload
- )
- )
- }
- }
-
- // Create the entry iterator
- iterator = entries.values.sortedBy { it.start }.iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
deleted file mode 100644
index dde1b340..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.wtf
-
-import org.apache.avro.generic.GenericRecord
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.format.util.LocalParquetReader
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.workflow.api.Job
-import org.opendc.workflow.api.Task
-import org.opendc.workflow.api.WORKFLOW_TASK_CORES
-import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
-import java.io.File
-import java.nio.file.Path
-import java.util.UUID
-import kotlin.math.min
-
-/**
- * A [TraceReader] for the Workflow Trace Format (WTF). See the Workflow Trace Archive
- * (https://wta.atlarge-research.com/) for more information about the format.
- *
- * @param path The path to the trace.
- */
-public class WtfTraceReader(path: Path) : TraceReader<Job> {
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<Job>>
-
- /**
- * Construct a [TraceReader] from the specified [path].
- *
- * @param path The path to the trace.
- */
- public constructor(path: File) : this(path.toPath())
-
- /**
- * Initialize the reader.
- */
- init {
- val workflows = mutableMapOf<Long, Job>()
- val starts = mutableMapOf<Long, Long>()
- val tasks = mutableMapOf<Long, Task>()
- val taskDependencies = mutableMapOf<Task, List<Long>>()
-
- LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")).use { reader ->
- while (true) {
- val nextRecord = reader.read() ?: break
-
- val workflowId = nextRecord.get("workflow_id") as Long
- val taskId = nextRecord.get("id") as Long
- val submitTime = nextRecord.get("ts_submit") as Long
- val runtime = nextRecord.get("runtime") as Long
- val cores = (nextRecord.get("resource_amount_requested") as Double).toInt()
-
- @Suppress("UNCHECKED_CAST")
- val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map {
- it.get("item") as Long
- }
-
- val flops: Long = 4100 * (runtime / 1000) * cores
-
- val workflow = workflows.getOrPut(workflowId) {
- Job(UUID(0L, workflowId), "<unnamed>", HashSet())
- }
- val workload = SimFlopsWorkload(flops)
- val task = Task(
- UUID(0L, taskId),
- "<unnamed>",
- HashSet(),
- mapOf(
- "workload" to workload,
- WORKFLOW_TASK_CORES to cores,
- WORKFLOW_TASK_DEADLINE to runtime
- )
- )
-
- starts.merge(workflowId, submitTime, ::min)
- (workflow.tasks as MutableSet<Task>).add(task)
- tasks[taskId] = task
- taskDependencies[task] = dependencies
- }
- }
-
- // Fix dependencies and dependents for all tasks
- taskDependencies.forEach { (task, dependencies) ->
- (task.dependencies as MutableSet<Task>).addAll(
- dependencies.map { taskId ->
- tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found")
- }
- )
- }
-
- // Create the entry iterator
- iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) }
- .sortedBy { it.start }
- .iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<Job> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt b/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt
deleted file mode 100644
index e0e049cf..00000000
--- a/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.swf
-
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import java.io.File
-
-class SwfTraceReaderTest {
- @Test
- internal fun testParseSwf() {
- val reader = SwfTraceReader(File(SwfTraceReaderTest::class.java.getResource("/swf_trace.txt").toURI()))
- var entry = reader.next()
- assertEquals(0, entry.start)
- // 1961 slices for waiting, 3 full and 1 partial running slices
- assertEquals(1965, (entry.workload as SimTraceWorkload).trace.toList().size)
-
- entry = reader.next()
- assertEquals(164472, entry.start)
- // 1188 slices for waiting, 0 full and 1 partial running slices
- assertEquals(1189, (entry.workload as SimTraceWorkload).trace.toList().size)
- assertEquals(0.25, (entry.workload as SimTraceWorkload).trace.toList().last().usage)
- }
-}
diff --git a/opendc-trace/build.gradle.kts b/opendc-trace/build.gradle.kts
new file mode 100644
index 00000000..7edfd134
--- /dev/null
+++ b/opendc-trace/build.gradle.kts
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
diff --git a/opendc-trace/opendc-trace-api/build.gradle.kts b/opendc-trace/opendc-trace-api/build.gradle.kts
new file mode 100644
index 00000000..b2f91593
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/build.gradle.kts
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Workload trace library for OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
new file mode 100644
index 00000000..44dec95b
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ResourceColumns")
+package org.opendc.trace
+
+import java.time.Instant
+
+/**
+ * Identifier of the resource.
+ */
+@JvmField
+public val RESOURCE_ID: TableColumn<String> = stringColumn("resource:id")
+
+/**
+ * Start time for the resource.
+ */
+@JvmField
+public val RESOURCE_START_TIME: TableColumn<Instant> = TableColumn("resource:start_time", Instant::class.java)
+
+/**
+ * End time for the resource.
+ */
+@JvmField
+public val RESOURCE_END_TIME: TableColumn<Instant> = TableColumn("resource:end_time", Instant::class.java)
+
+/**
+ * Number of CPUs for the resource.
+ */
+@JvmField
+public val RESOURCE_NCPUS: TableColumn<Int> = intColumn("resource:num_cpus")
+
+/**
+ * Memory capacity for the resource.
+ */
+@JvmField
+public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource:mem_capacity")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
new file mode 100644
index 00000000..1933967e
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ResourceStateColumns")
+package org.opendc.trace
+
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * Identifier of the resource.
+ */
+@JvmField
+public val RESOURCE_STATE_ID: TableColumn<String> = stringColumn("resource_state:id")
+
+/**
+ * The cluster to which the resource belongs.
+ */
+@JvmField
+public val RESOURCE_STATE_CLUSTER_ID: TableColumn<String> = stringColumn("resource_state:cluster_id")
+
+/**
+ * Timestamp for the state.
+ */
+@JvmField
+public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = TableColumn("resource_state:timestamp", Instant::class.java)
+
+/**
+ * Duration for the state.
+ */
+@JvmField
+public val RESOURCE_STATE_DURATION: TableColumn<Duration> = TableColumn("resource_state:duration", Duration::class.java)
+
+/**
+ * A flag to indicate that the resource is powered on.
+ */
+@JvmField
+public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = booleanColumn("resource_state:powered_on")
+
+/**
+ * Number of CPUs for the resource.
+ */
+@JvmField
+public val RESOURCE_STATE_NCPUS: TableColumn<Int> = intColumn("resource_state:ncpus")
+
+/**
+ * Total CPU capacity of the resource in MHz.
+ */
+@JvmField
+public val RESOURCE_STATE_CPU_CAPACITY: TableColumn<Double> = doubleColumn("resource_state:cpu_capacity")
+
+/**
+ * Total CPU usage of the resource in MHz.
+ */
+@JvmField
+public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = doubleColumn("resource_state:cpu_usage")
+
+/**
+ * Total CPU usage of the resource in percentage.
+ */
+@JvmField
+public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = doubleColumn("resource_state:cpu_usage_pct")
+
+/**
+ * Total CPU demand of the resource in MHz.
+ */
+@JvmField
+public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = doubleColumn("resource_state:cpu_demand")
+
+/**
+ * CPU ready percentage.
+ */
+@JvmField
+public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = doubleColumn("resource_state:cpu_ready_pct")
+
+/**
+ * Memory capacity of the resource in KB.
+ */
+@JvmField
+public val RESOURCE_STATE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource_state:mem_capacity")
+
+/**
+ * Memory usage of the resource in KB.
+ */
+@JvmField
+public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = doubleColumn("resource_state:mem_usage")
+
+/**
+ * Disk read throughput of the resource in KB/s.
+ */
+@JvmField
+public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = doubleColumn("resource_state:disk_read")
+
+/**
+ * Disk write throughput of the resource in KB/s.
+ */
+@JvmField
+public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = doubleColumn("resource_state:disk_write")
+
+/**
+ * Network receive throughput of the resource in KB/s.
+ */
+@JvmField
+public val RESOURCE_STATE_NET_RX: TableColumn<Double> = doubleColumn("resource_state:net_rx")
+
+/**
+ * Network transmit throughput of the resource in KB/s.
+ */
+@JvmField
+public val RESOURCE_STATE_NET_TX: TableColumn<Double> = doubleColumn("resource_state:net_tx")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
new file mode 100644
index 00000000..11e5d6b7
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace
+
+/**
+ * A table is collection of rows consisting of typed columns.
+ */
+public interface Table {
+ /**
+ * The name of the table.
+ */
+ public val name: String
+
+ /**
+ * A flag to indicate that the table is synthetic (derived from another table).
+ */
+ public val isSynthetic: Boolean
+
+ /**
+ * Determine whether the specified [column] is supported by this table.
+ */
+ public fun isSupported(column: TableColumn<*>): Boolean
+
+ /**
+ * Open a [TableReader] for this table.
+ */
+ public fun newReader(): TableReader
+
+ /**
+ * Open a [TableReader] for [partition] of the table.
+ */
+ public fun newReader(partition: String): TableReader
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
new file mode 100644
index 00000000..776c40c0
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace
+
+import java.util.*
+
+/**
+ * A column in a trace table.
+ *
+ * @param name The universal name of this column.
+ */
+public class TableColumn<out T>(public val name: String, type: Class<T>) {
+ /**
+ * The type of the column.
+ */
+ private val type: Class<*> = type
+
+ /**
+ * Determine whether the type of the column is a subtype of [column].
+ */
+ public fun isAssignableTo(column: TableColumn<*>): Boolean {
+ return name == column.name && type.isAssignableFrom(column.type)
+ }
+
+ /**
+ * Compute a hash code for this column.
+ */
+ public override fun hashCode(): Int = Objects.hash(name, type)
+
+ /**
+ * Determine whether this column is equal to [other].
+ */
+ public override fun equals(other: Any?): Boolean {
+ // Fast-path: reference equality
+ if (this === other) {
+ return true
+ } else if (other == null || other !is TableColumn<*>) {
+ return false
+ }
+
+ return name == other.name && type == other.type
+ }
+
+ /**
+ * Return a string representation of this column.
+ */
+ public override fun toString(): String = "TableColumn[$name,$type]"
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt
new file mode 100644
index 00000000..64920498
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TableColumns")
+package org.opendc.trace
+
+/**
+ * Construct a [TableColumn] with [Any] type.
+ */
+public fun objectColumn(name: String): TableColumn<Any> = TableColumn(name, Any::class.java)
+
+/**
+ * Construct a [TableColumn] with a [String] type.
+ */
+public fun stringColumn(name: String): TableColumn<String> = TableColumn(name, String::class.java)
+
+/**
+ * Construct a [TableColumn] with a [Number] type.
+ */
+public fun numberColumn(name: String): TableColumn<Number> = TableColumn(name, Number::class.java)
+
+/**
+ * Construct a [TableColumn] with an [Int] type.
+ */
+public fun intColumn(name: String): TableColumn<Int> = TableColumn(name, Int::class.java)
+
+/**
+ * Construct a [TableColumn] with a [Long] type.
+ */
+public fun longColumn(name: String): TableColumn<Long> = TableColumn(name, Long::class.java)
+
+/**
+ * Construct a [TableColumn] with a [Double] type.
+ */
+public fun doubleColumn(name: String): TableColumn<Double> = TableColumn(name, Double::class.java)
+
+/**
+ * Construct a [TableColumn] with a [Boolean] type.
+ */
+public fun booleanColumn(name: String): TableColumn<Boolean> = TableColumn(name, Boolean::class.java)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
new file mode 100644
index 00000000..b5e7669f
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace
+
+/**
+ * Base class for reading entities from a workload trace table in streaming fashion.
+ */
+public interface TableReader : AutoCloseable {
+ /**
+ * Advance the stream until the next row is reached.
+ *
+ * @return `true` if the row is valid, `false` if there are no more rows.
+ */
+ public fun nextRow(): Boolean
+
+ /**
+ * Determine whether the [TableReader] supports the specified [column].
+ */
+ public fun hasColumn(column: TableColumn<*>): Boolean
+
+ /**
+ * Obtain the value of the current column with type [T].
+ */
+ public fun <T> get(column: TableColumn<T>): T
+
+ /**
+ * Read the specified [column] as boolean.
+ */
+ public fun getBoolean(column: TableColumn<Boolean>): Boolean
+
+ /**
+ * Read the specified [column] as integer.
+ */
+ public fun getInt(column: TableColumn<Int>): Int
+
+ /**
+ * Read the specified [column] as long.
+ */
+ public fun getLong(column: TableColumn<Long>): Long
+
+ /**
+ * Read the specified [column] as double.
+ */
+ public fun getDouble(column: TableColumn<Double>): Double
+
+ /**
+ * Closes the reader so that no further iteration or data access can be made.
+ */
+ public override fun close()
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt
new file mode 100644
index 00000000..bb9d93e2
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("Tables")
+package org.opendc.trace
+
+/**
+ * A table containing all workflows in a workload.
+ */
+public const val TABLE_WORKFLOWS: String = "workflows"
+
+/**
+ * A table containing all tasks in a workload.
+ */
+public const val TABLE_TASKS: String = "tasks"
+
+/**
+ * A table containing all resources in a workload.
+ */
+public const val TABLE_RESOURCES: String = "resources"
+
+/**
+ * A table containing all resource states in a workload.
+ */
+public const val TABLE_RESOURCE_STATES: String = "resource_states"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
new file mode 100644
index 00000000..88bbc623
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TaskColumns")
+package org.opendc.trace
+
+/**
+ * A column containing the task identifier.
+ */
+@JvmField
+public val TASK_ID: TableColumn<Long> = longColumn("task:id")
+
+/**
+ * A column containing the identifier of the workflow.
+ */
+@JvmField
+public val TASK_WORKFLOW_ID: TableColumn<Long> = longColumn("task:workflow_id")
+
+/**
+ * A column containing the submit time of the task.
+ */
+@JvmField
+public val TASK_SUBMIT_TIME: TableColumn<Long> = longColumn("task:submit_time")
+
+/**
+ * A column containing the wait time of the task.
+ */
+@JvmField
+public val TASK_WAIT_TIME: TableColumn<Long> = longColumn("task:wait_time")
+
+/**
+ * A column containing the runtime time of the task.
+ */
+@JvmField
+public val TASK_RUNTIME: TableColumn<Long> = longColumn("task:runtime")
+
+/**
+ * A column containing the parents of a task.
+ */
+@Suppress("UNCHECKED_CAST")
+@JvmField
+public val TASK_PARENTS: TableColumn<Set<Long>> = TableColumn("task:parents", type = Set::class.java as Class<Set<Long>>)
+
+/**
+ * A column containing the children of a task.
+ */
+@Suppress("UNCHECKED_CAST")
+@JvmField
+public val TASK_CHILDREN: TableColumn<Set<Long>> = TableColumn("task:children", type = Set::class.java as Class<Set<Long>>)
+
+/**
+ * A column containing the requested CPUs of a task.
+ */
+@JvmField
+public val TASK_REQ_NCPUS: TableColumn<Int> = intColumn("task:req_ncpus")
+
+/**
+ * A column containing the allocated CPUs of a task.
+ */
+@JvmField
+public val TASK_ALLOC_NCPUS: TableColumn<Int> = intColumn("task:alloc_ncpus")
+
+/**
+ * A column containing the status of a task.
+ */
+@JvmField
+public val TASK_STATUS: TableColumn<Int> = intColumn("task:status")
+
+/**
+ * A column containing the group id of a task.
+ */
+@JvmField
+public val TASK_GROUP_ID: TableColumn<Int> = intColumn("task:group_id")
+
+/**
+ * A column containing the user id of a task.
+ */
+@JvmField
+public val TASK_USER_ID: TableColumn<Int> = intColumn("task:user_id")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
new file mode 100644
index 00000000..36e93b52
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace
+
+/**
+ * A trace is a collection of related tables that characterize a workload.
+ */
+public interface Trace {
+ /**
+ * The list of table names in the workload trace.
+ */
+ public val tables: List<String>
+
+ /**
+ * Determine if the trace contains a table with the specified [name].
+ */
+ public fun containsTable(name: String): Boolean
+
+ /**
+ * Obtain a [Table] with the specified [name].
+ */
+ public fun getTable(name: String): Table?
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
new file mode 100644
index 00000000..54029fcf
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.spi
+
+import org.opendc.trace.Trace
+import java.net.URL
+import java.util.*
+
+/**
+ * A service-provider class for parsing trace formats.
+ */
+public interface TraceFormat {
+ /**
+ * The name of the trace format.
+ */
+ public val name: String
+
+ /**
+ * Open a new [Trace] with this provider.
+ *
+ * @param url A reference to the trace.
+ */
+ public fun open(url: URL): Trace
+
+ /**
+ * A helper object for resolving providers.
+ */
+ public companion object {
+ /**
+ * A list of [TraceFormat] that are available on this system.
+ */
+ public val installedProviders: List<TraceFormat> by lazy {
+ val loader = ServiceLoader.load(TraceFormat::class.java)
+ loader.toList()
+ }
+
+ /**
+ * Obtain a [TraceFormat] implementation by [name].
+ */
+ public fun byName(name: String): TraceFormat? = installedProviders.find { it.name == name }
+ }
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/build.gradle.kts b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts
new file mode 100644
index 00000000..d195cbbb
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Support for GWF traces in OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcTrace.opendcTraceApi)
+ implementation(libs.jackson.dataformat.csv)
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
new file mode 100644
index 00000000..767ef919
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
+
+/**
+ * The resource state [Table] in the Bitbrains format.
+ */
+internal class BitbrainsResourceStateTable(private val factory: CsvFactory, private val path: Path) : Table {
+ /**
+ * The partitions that belong to the table.
+ */
+ private val partitions =
+ Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "csv" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+
+ override val name: String = TABLE_RESOURCE_STATES
+
+ override val isSynthetic: Boolean = false
+
+ override fun isSupported(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_CAPACITY -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ RESOURCE_STATE_CPU_USAGE_PCT -> true
+ RESOURCE_STATE_MEM_CAPACITY -> true
+ RESOURCE_STATE_MEM_USAGE -> true
+ RESOURCE_STATE_DISK_READ -> true
+ RESOURCE_STATE_DISK_WRITE -> true
+ RESOURCE_STATE_NET_RX -> true
+ RESOURCE_STATE_NET_TX -> true
+ else -> false
+ }
+ }
+
+ override fun newReader(): TableReader {
+ val it = partitions.iterator()
+
+ return object : TableReader {
+ var delegate: TableReader? = nextDelegate()
+
+ override fun nextRow(): Boolean {
+ var delegate = delegate
+
+ while (delegate != null) {
+ if (delegate.nextRow()) {
+ break
+ }
+
+ delegate.close()
+ delegate = nextDelegate()
+ }
+
+ this.delegate = delegate
+ return delegate != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.get(column)
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getBoolean(column)
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getInt(column)
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getLong(column)
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getDouble(column)
+ }
+
+ override fun close() {
+ delegate?.close()
+ }
+
+ private fun nextDelegate(): TableReader? {
+ return if (it.hasNext()) {
+ val (partition, path) = it.next()
+ return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile()))
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "BitbrainsCompositeTableReader"
+ }
+ }
+
+ override fun newReader(partition: String): TableReader {
+ val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
+ return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile()))
+ }
+
+ override fun toString(): String = "BitbrainsResourceStateTable"
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
new file mode 100644
index 00000000..5687ac7f
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
@@ -0,0 +1,218 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import org.opendc.trace.*
+import java.time.Instant
+
+/**
+ * A [TableReader] for the Bitbrains resource state table.
+ */
+internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader {
+ /**
+ * The current parser state.
+ */
+ private val state = RowState()
+
+ init {
+ parser.schema = schema
+ }
+
+ override fun nextRow(): Boolean {
+ // Reset the row state
+ state.reset()
+
+ if (!nextStart()) {
+ return false
+ }
+
+ while (true) {
+ val token = parser.nextValue()
+
+ if (token == null || token == JsonToken.END_OBJECT) {
+ break
+ }
+
+ when (parser.currentName) {
+ "Timestamp [ms]" -> state.timestamp = Instant.ofEpochSecond(parser.longValue)
+ "CPU cores" -> state.cpuCores = parser.intValue
+ "CPU capacity provisioned [MHZ]" -> state.cpuCapacity = parser.doubleValue
+ "CPU usage [MHZ]" -> state.cpuUsage = parser.doubleValue
+ "CPU usage [%]" -> state.cpuUsagePct = parser.doubleValue
+ "Memory capacity provisioned [KB]" -> state.memCapacity = parser.doubleValue
+ "Memory usage [KB]" -> state.memUsage = parser.doubleValue
+ "Disk read throughput [KB/s]" -> state.diskRead = parser.doubleValue
+ "Disk write throughput [KB/s]" -> state.diskWrite = parser.doubleValue
+ "Network received throughput [KB/s]" -> state.netReceived = parser.doubleValue
+ "Network transmitted throughput [KB/s]" -> state.netTransmitted = parser.doubleValue
+ }
+ }
+
+ return true
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_CAPACITY -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ RESOURCE_STATE_CPU_USAGE_PCT -> true
+ RESOURCE_STATE_MEM_CAPACITY -> true
+ RESOURCE_STATE_MEM_USAGE -> true
+ RESOURCE_STATE_DISK_READ -> true
+ RESOURCE_STATE_DISK_WRITE -> true
+ RESOURCE_STATE_NET_RX -> true
+ RESOURCE_STATE_NET_TX -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ RESOURCE_STATE_ID -> partition
+ RESOURCE_STATE_TIMESTAMP -> state.timestamp
+ RESOURCE_STATE_NCPUS -> state.cpuCores
+ RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct
+ RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
+ RESOURCE_STATE_MEM_USAGE -> state.memUsage
+ RESOURCE_STATE_DISK_READ -> state.diskRead
+ RESOURCE_STATE_DISK_WRITE -> state.diskWrite
+ RESOURCE_STATE_NET_RX -> state.netReceived
+ RESOURCE_STATE_NET_TX -> state.netTransmitted
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ return when (column) {
+ RESOURCE_STATE_NCPUS -> state.cpuCores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ return when (column) {
+ RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct
+ RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
+ RESOURCE_STATE_MEM_USAGE -> state.memUsage
+ RESOURCE_STATE_DISK_READ -> state.diskRead
+ RESOURCE_STATE_DISK_WRITE -> state.diskWrite
+ RESOURCE_STATE_NET_RX -> state.netReceived
+ RESOURCE_STATE_NET_TX -> state.netTransmitted
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Advance the parser until the next object start.
+ */
+ private fun nextStart(): Boolean {
+ var token = parser.nextValue()
+
+ while (token != null && token != JsonToken.START_OBJECT) {
+ token = parser.nextValue()
+ }
+
+ return token != null
+ }
+
+ /**
+ * The current row state.
+ */
+ private class RowState {
+ var timestamp: Instant? = null
+ var cpuCores = -1
+ var cpuCapacity = Double.NaN
+ var cpuUsage = Double.NaN
+ var cpuUsagePct = Double.NaN
+ var memCapacity = Double.NaN
+ var memUsage = Double.NaN
+ var diskRead = Double.NaN
+ var diskWrite = Double.NaN
+ var netReceived = Double.NaN
+ var netTransmitted = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ fun reset() {
+ timestamp = null
+ cpuCores = -1
+ cpuCapacity = Double.NaN
+ cpuUsage = Double.NaN
+ cpuUsagePct = Double.NaN
+ memCapacity = Double.NaN
+ memUsage = Double.NaN
+ diskRead = Double.NaN
+ diskWrite = Double.NaN
+ netReceived = Double.NaN
+ netTransmitted = Double.NaN
+ }
+ }
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .setUseHeader(true)
+ .setColumnSeparator(';')
+ .build()
+ }
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt
new file mode 100644
index 00000000..5a2d4243
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * [Trace] implementation for the Bitbrains format.
+ */
+public class BitbrainsTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_RESOURCE_STATES)
+
+ override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name
+
+ override fun getTable(name: String): Table? {
+ if (!containsTable(name)) {
+ return null
+ }
+
+ return BitbrainsResourceStateTable(factory, path)
+ }
+
+ override fun toString(): String = "BitbrainsTrace[$path]"
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
new file mode 100644
index 00000000..55b11fe3
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A format implementation for the GWF trace format.
+ */
+public class BitbrainsTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "bitbrains"
+
+ /**
+ * The [CsvFactory] used to create the parser.
+ */
+ private val factory = CsvFactory()
+ .enable(CsvParser.Feature.ALLOW_COMMENTS)
+ .enable(CsvParser.Feature.TRIM_SPACES)
+
+ /**
+ * Open a Bitbrains trace.
+ */
+ override fun open(url: URL): BitbrainsTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return BitbrainsTrace(factory, path)
+ }
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
new file mode 100644
index 00000000..f18135d0
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
@@ -0,0 +1 @@
+org.opendc.trace.bitbrains.BitbrainsTraceFormat
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
new file mode 100644
index 00000000..550805d3
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
@@ -0,0 +1,100 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.trace.RESOURCE_STATE_CPU_USAGE
+import org.opendc.trace.RESOURCE_STATE_TIMESTAMP
+import org.opendc.trace.TABLE_RESOURCE_STATES
+import java.net.URL
+
+/**
+ * Test suite for the [BitbrainsTraceFormat] class.
+ */
+class BitbrainsTraceFormatTest {
+ @Test
+ fun testTraceExists() {
+ val format = BitbrainsTraceFormat()
+ val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
+ assertDoesNotThrow {
+ format.open(url)
+ }
+ }
+
+ @Test
+ fun testTraceDoesNotExists() {
+ val format = BitbrainsTraceFormat()
+ val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
+ assertThrows<IllegalArgumentException> {
+ format.open(URL(url.toString() + "help"))
+ }
+ }
+
+ @Test
+ fun testTables() {
+ val format = BitbrainsTraceFormat()
+ val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
+ val trace = format.open(url)
+
+ assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables)
+ }
+
+ @Test
+ fun testTableExists() {
+ val format = BitbrainsTraceFormat()
+ val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
+ val table = format.open(url).getTable(TABLE_RESOURCE_STATES)
+
+ assertNotNull(table)
+ assertDoesNotThrow { table!!.newReader() }
+ }
+
+ @Test
+ fun testTableDoesNotExist() {
+ val format = BitbrainsTraceFormat()
+ val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
+ val trace = format.open(url)
+
+ assertFalse(trace.containsTable("test"))
+ assertNull(trace.getTable("test"))
+ }
+
+ @Test
+ fun testSmoke() {
+ val format = BitbrainsTraceFormat()
+ val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
+ val trace = format.open(url)
+
+ val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader()
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) },
+ { assertEquals(19.066, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) }
+ )
+
+ reader.close()
+ }
+}
diff --git a/opendc-format/src/test/resources/bitbrains.csv b/opendc-trace/opendc-trace-bitbrains/src/test/resources/bitbrains.csv
index f5e300e8..f5e300e8 100644
--- a/opendc-format/src/test/resources/bitbrains.csv
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/resources/bitbrains.csv
diff --git a/opendc-trace/opendc-trace-gwf/build.gradle.kts b/opendc-trace/opendc-trace-gwf/build.gradle.kts
new file mode 100644
index 00000000..f3dfd6ef
--- /dev/null
+++ b/opendc-trace/opendc-trace-gwf/build.gradle.kts
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Support for GWF traces in OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcTrace.opendcTraceApi)
+
+ implementation(libs.jackson.dataformat.csv)
+}
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt
new file mode 100644
index 00000000..80a99d10
--- /dev/null
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.gwf
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.net.URL
+
+/**
+ * A [Table] containing the tasks in a GWF trace.
+ */
+internal class GwfTaskTable(private val factory: CsvFactory, private val url: URL) : Table {
+ override val name: String = TABLE_TASKS
+
+ override val isSynthetic: Boolean = false
+
+ override fun isSupported(column: TableColumn<*>): Boolean {
+ return when (column) {
+ TASK_WORKFLOW_ID -> true
+ TASK_ID -> true
+ TASK_SUBMIT_TIME -> true
+ TASK_RUNTIME -> true
+ TASK_REQ_NCPUS -> true
+ TASK_ALLOC_NCPUS -> true
+ TASK_PARENTS -> true
+ else -> false
+ }
+ }
+
+ override fun newReader(): TableReader {
+ return GwfTaskTableReader(factory.createParser(url))
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Invalid partition $partition")
+ }
+
+ override fun toString(): String = "GwfTaskTable"
+}
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
new file mode 100644
index 00000000..64b7d465
--- /dev/null
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
@@ -0,0 +1,211 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.gwf
+
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import org.opendc.trace.*
+import java.util.regex.Pattern
+
+/**
+ * A [TableReader] implementation for the GWF format.
+ */
+internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
+ /**
+ * The current parser state.
+ */
+ private val state = RowState()
+
+ init {
+ parser.schema = schema
+ }
+
+ override fun nextRow(): Boolean {
+ // Reset the row state
+ state.reset()
+
+ if (!nextStart()) {
+ return false
+ }
+
+ while (true) {
+ val token = parser.nextValue()
+
+ if (token == null || token == JsonToken.END_OBJECT) {
+ break
+ }
+
+ when (parser.currentName) {
+ "WorkflowID" -> state.workflowId = parser.longValue
+ "JobID" -> state.jobId = parser.longValue
+ "SubmitTime" -> state.submitTime = parser.longValue
+ "RunTime" -> state.runtime = parser.longValue
+ "NProcs" -> state.nProcs = parser.intValue
+ "ReqNProcs" -> state.reqNProcs = parser.intValue
+ "Dependencies" -> parseParents(parser.valueAsString)
+ }
+ }
+
+ return true
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ TASK_WORKFLOW_ID -> true
+ TASK_ID -> true
+ TASK_SUBMIT_TIME -> true
+ TASK_RUNTIME -> true
+ TASK_REQ_NCPUS -> true
+ TASK_ALLOC_NCPUS -> true
+ TASK_PARENTS -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any = when (column) {
+ TASK_WORKFLOW_ID -> state.workflowId
+ TASK_ID -> state.jobId
+ TASK_SUBMIT_TIME -> state.submitTime
+ TASK_RUNTIME -> state.runtime
+ TASK_REQ_NCPUS -> state.nProcs
+ TASK_ALLOC_NCPUS -> state.reqNProcs
+ TASK_PARENTS -> state.dependencies
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ return when (column) {
+ TASK_REQ_NCPUS -> state.nProcs
+ TASK_ALLOC_NCPUS -> state.reqNProcs
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ return when (column) {
+ TASK_WORKFLOW_ID -> state.workflowId
+ TASK_ID -> state.jobId
+ TASK_SUBMIT_TIME -> state.submitTime
+ TASK_RUNTIME -> state.runtime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * The pattern used to parse the parents.
+ */
+ private val pattern = Pattern.compile("\\s+")
+
+ /**
+ * Parse the parents into a set of longs.
+ */
+ private fun parseParents(value: String): Set<Long> {
+ val result = mutableSetOf<Long>()
+ val deps = value.split(pattern)
+
+ for (dep in deps) {
+ if (dep.isBlank()) {
+ continue
+ }
+
+ result.add(dep.toLong(10))
+ }
+
+ return result
+ }
+
+ /**
+ * Advance the parser until the next object start.
+ */
+ private fun nextStart(): Boolean {
+ var token = parser.nextValue()
+
+ while (token != null && token != JsonToken.START_OBJECT) {
+ token = parser.nextValue()
+ }
+
+ return token != null
+ }
+
+ /**
+ * The current row state.
+ */
+ private class RowState {
+ var workflowId = -1L
+ var jobId = -1L
+ var submitTime = -1L
+ var runtime = -1L
+ var nProcs = -1
+ var reqNProcs = -1
+ var dependencies = emptySet<Long>()
+
+ /**
+ * Reset the state.
+ */
+ fun reset() {
+ workflowId = -1
+ jobId = -1
+ submitTime = -1
+ runtime = -1
+ nProcs = -1
+ reqNProcs = -1
+ dependencies = emptySet()
+ }
+ }
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("WorkflowID", CsvSchema.ColumnType.NUMBER)
+ .addColumn("JobID", CsvSchema.ColumnType.NUMBER)
+ .addColumn("SubmitTime", CsvSchema.ColumnType.NUMBER)
+ .addColumn("RunTime", CsvSchema.ColumnType.NUMBER)
+ .addColumn("NProcs", CsvSchema.ColumnType.NUMBER)
+ .addColumn("ReqNProcs", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Dependencies", CsvSchema.ColumnType.STRING)
+ .setAllowComments(true)
+ .setUseHeader(true)
+ .setColumnSeparator(',')
+ .build()
+ }
+}
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt
new file mode 100644
index 00000000..166c1e56
--- /dev/null
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.gwf
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.net.URL
+
+/**
+ * [Trace] implementation for the GWF format.
+ */
+public class GwfTrace internal constructor(private val factory: CsvFactory, private val url: URL) : Trace {
+ override val tables: List<String> = listOf(TABLE_TASKS)
+
+ override fun containsTable(name: String): Boolean = TABLE_TASKS == name
+
+ override fun getTable(name: String): Table? {
+ if (!containsTable(name)) {
+ return null
+ }
+
+ return GwfTaskTable(factory, url)
+ }
+
+ override fun toString(): String = "GwfTrace[$url]"
+}
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
new file mode 100644
index 00000000..6d542503
--- /dev/null
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.gwf
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A [TraceFormat] implementation for the GWF trace format.
+ */
+public class GwfTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "gwf"
+
+ /**
+ * The [CsvFactory] used to create the parser.
+ */
+ private val factory = CsvFactory()
+ .enable(CsvParser.Feature.ALLOW_COMMENTS)
+ .enable(CsvParser.Feature.TRIM_SPACES)
+
+ /**
+ * Read the tasks in the GWF trace.
+ */
+ public override fun open(url: URL): GwfTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return GwfTrace(factory, url)
+ }
+}
diff --git a/opendc-trace/opendc-trace-gwf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-gwf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
new file mode 100644
index 00000000..99a874c8
--- /dev/null
+++ b/opendc-trace/opendc-trace-gwf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
@@ -0,0 +1 @@
+org.opendc.trace.gwf.GwfTraceFormat
diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
new file mode 100644
index 00000000..6b0568fe
--- /dev/null
+++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.gwf
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.junit.jupiter.api.assertThrows
+import org.opendc.trace.*
+import java.net.URL
+
+/**
+ * Test suite for the [GwfTraceFormat] class.
+ */
+internal class GwfTraceFormatTest {
+ @Test
+ fun testTraceExists() {
+ val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf"))
+ val format = GwfTraceFormat()
+ assertDoesNotThrow {
+ format.open(input)
+ }
+ }
+
+ @Test
+ fun testTraceDoesNotExists() {
+ val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf"))
+ val format = GwfTraceFormat()
+ assertThrows<IllegalArgumentException> {
+ format.open(URL(input.toString() + "help"))
+ }
+ }
+
+ @Test
+ fun testTables() {
+ val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf"))
+ val format = GwfTraceFormat()
+ val trace = format.open(input)
+
+ assertEquals(listOf(TABLE_TASKS), trace.tables)
+ }
+
+ @Test
+ fun testTableExists() {
+ val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf"))
+ val format = GwfTraceFormat()
+ val table = format.open(input).getTable(TABLE_TASKS)
+
+ assertNotNull(table)
+ assertDoesNotThrow { table!!.newReader() }
+ }
+
+ @Test
+ fun testTableDoesNotExist() {
+ val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf"))
+ val format = GwfTraceFormat()
+ val trace = format.open(input)
+
+ assertFalse(trace.containsTable("test"))
+ assertNull(trace.getTable("test"))
+ }
+
+ @Test
+ fun testTableReader() {
+ val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf"))
+ val format = GwfTraceFormat()
+ val table = format.open(input).getTable(TABLE_TASKS)!!
+ val reader = table.newReader()
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals(0L, reader.getLong(TASK_WORKFLOW_ID)) },
+ { assertEquals(1L, reader.getLong(TASK_ID)) },
+ { assertEquals(16, reader.getLong(TASK_SUBMIT_TIME)) },
+ { assertEquals(11, reader.getLong(TASK_RUNTIME)) },
+ { assertEquals(setOf<Long>(), reader.get(TASK_PARENTS)) },
+ )
+ }
+
+ @Test
+ fun testTableReaderPartition() {
+ val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf"))
+ val format = GwfTraceFormat()
+ val table = format.open(input).getTable(TABLE_TASKS)!!
+
+ assertThrows<IllegalArgumentException> { table.newReader("test") }
+ }
+}
diff --git a/opendc-trace/opendc-trace-gwf/src/test/resources/trace.gwf b/opendc-trace/opendc-trace-gwf/src/test/resources/trace.gwf
new file mode 100644
index 00000000..2f99616d
--- /dev/null
+++ b/opendc-trace/opendc-trace-gwf/src/test/resources/trace.gwf
@@ -0,0 +1,71 @@
+WorkflowID, JobID , SubmitTime, RunTime , NProcs , ReqNProcs , Dependencies
+0 , 1 , 16 , 11 , 1 , 1 ,
+0 , 2 , 40 , 11 , 1 , 1 , 1
+0 , 3 , 40 , 11 , 1 , 1 , 1
+0 , 4 , 64 , 11 , 1 , 1 , 2
+0 , 5 , 63 , 11 , 1 , 1 , 3
+0 , 6 , 64 , 11 , 1 , 1 , 3
+0 , 7 , 87 , 11 , 1 , 1 , 4 5 6
+1 , 8 , 4 , 11 , 1 , 1 ,
+1 , 9 , 15 , 11 , 1 , 1 , 8
+1 , 10 , 15 , 11 , 1 , 1 , 8
+1 , 11 , 27 , 11 , 1 , 1 , 9
+1 , 12 , 27 , 11 , 1 , 1 , 10
+1 , 13 , 27 , 11 , 1 , 1 , 10
+1 , 14 , 38 , 11 , 1 , 1 , 12 11 13
+2 , 15 , 3 , 11 , 1 , 1 ,
+2 , 16 , 27 , 11 , 1 , 1 , 15
+2 , 17 , 27 , 11 , 1 , 1 , 15
+2 , 18 , 52 , 11 , 1 , 1 , 16
+2 , 19 , 51 , 11 , 1 , 1 , 17
+2 , 20 , 51 , 11 , 1 , 1 , 17
+2 , 21 , 75 , 11 , 1 , 1 , 20 18 19
+3 , 22 , 3 , 11 , 1 , 1 ,
+3 , 23 , 27 , 11 , 1 , 1 , 22
+3 , 24 , 27 , 11 , 1 , 1 , 22
+3 , 25 , 51 , 11 , 1 , 1 , 23
+3 , 26 , 50 , 11 , 1 , 1 , 24
+3 , 27 , 51 , 11 , 1 , 1 , 24
+3 , 28 , 75 , 11 , 1 , 1 , 25 27 26
+4 , 29 , 3 , 11 , 1 , 1 ,
+4 , 30 , 27 , 11 , 1 , 1 , 29
+4 , 31 , 27 , 11 , 1 , 1 , 29
+4 , 32 , 50 , 11 , 1 , 1 , 30
+4 , 33 , 50 , 11 , 1 , 1 , 31
+4 , 34 , 51 , 11 , 1 , 1 , 31
+4 , 35 , 74 , 11 , 1 , 1 , 33 32 34
+5 , 36 , 3 , 11 , 1 , 1 ,
+5 , 37 , 27 , 11 , 1 , 1 , 36
+5 , 38 , 26 , 11 , 1 , 1 , 36
+5 , 39 , 51 , 11 , 1 , 1 , 37
+5 , 40 , 50 , 11 , 1 , 1 , 38
+5 , 41 , 50 , 11 , 1 , 1 , 38
+5 , 42 , 74 , 11 , 1 , 1 , 39 40 41
+6 , 43 , 4 , 11 , 1 , 1 ,
+6 , 44 , 27 , 11 , 1 , 1 , 43
+6 , 45 , 27 , 11 , 1 , 1 , 43
+6 , 46 , 51 , 11 , 1 , 1 , 44
+6 , 47 , 51 , 11 , 1 , 1 , 45
+6 , 48 , 51 , 11 , 1 , 1 , 45
+6 , 49 , 75 , 11 , 1 , 1 , 46 47 48
+7 , 50 , 3 , 0 , 1 , 1 ,
+7 , 51 , 17 , 0 , 1 , 1 , 50
+7 , 52 , 17 , 0 , 1 , 1 , 50
+7 , 53 , 30 , 0 , 1 , 1 , 51
+7 , 54 , 30 , 0 , 1 , 1 , 52
+7 , 55 , 31 , 0 , 1 , 1 , 52
+7 , 56 , 44 , 0 , 1 , 1 , 55 54 53
+8 , 57 , 3 , 11 , 1 , 1 ,
+8 , 58 , 26 , 11 , 1 , 1 , 57
+8 , 59 , 27 , 11 , 1 , 1 , 57
+8 , 60 , 50 , 11 , 1 , 1 , 58
+8 , 61 , 51 , 11 , 1 , 1 , 59
+8 , 62 , 50 , 11 , 1 , 1 , 59
+8 , 63 , 74 , 11 , 1 , 1 , 62 61 60
+9 , 64 , 3 , 11 , 1 , 1 ,
+9 , 65 , 27 , 11 , 1 , 1 , 64
+9 , 66 , 27 , 11 , 1 , 1 , 64
+9 , 67 , 51 , 11 , 1 , 1 , 65
+9 , 68 , 50 , 11 , 1 , 1 , 66
+9 , 69 , 51 , 11 , 1 , 1 , 66
+9 , 70 , 74 , 11 , 1 , 1 , 68 69 67
diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts
new file mode 100644
index 00000000..75378509
--- /dev/null
+++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Parquet helpers for traces in OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+
+ /* This configuration is necessary for a slim dependency on Apache Parquet */
+ api(libs.parquet) {
+ exclude(group = "org.apache.hadoop")
+ }
+ runtimeOnly(libs.hadoop.common) {
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ exclude(group = "log4j")
+ exclude(group = "org.apache.hadoop")
+ exclude(group = "org.apache.curator")
+ exclude(group = "org.apache.zookeeper")
+ exclude(group = "org.apache.kerby")
+ exclude(group = "org.apache.httpcomponents")
+ exclude(group = "org.apache.htrace")
+ exclude(group = "commons-cli")
+ exclude(group = "javax.servlet")
+ exclude(group = "org.eclipse.jetty")
+ exclude(group = "com.sun.jersey")
+ exclude(group = "com.jcraft")
+ exclude(group = "dnsjava")
+ }
+ runtimeOnly(libs.hadoop.mapreduce.client.core) {
+ isTransitive = false
+ }
+
+ testRuntimeOnly(libs.slf4j.simple)
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt
index 92319ace..fd2e00cd 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.format.util
+package org.opendc.trace.util.parquet
import org.apache.parquet.io.InputFile
import org.apache.parquet.io.SeekableInputStream
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt
index 657bca5a..1b17ae5d 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.format.util
+package org.opendc.trace.util.parquet
import org.apache.parquet.io.OutputFile
import org.apache.parquet.io.PositionOutputStream
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
index 5083f3e1..ef9eaeb3 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.format.util
+package org.opendc.trace.util.parquet
import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.hadoop.ParquetReader
diff --git a/opendc-format/src/test/kotlin/org/opendc/format/util/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
index e496dd96..8ef4d1fb 100644
--- a/opendc-format/src/test/kotlin/org/opendc/format/util/ParquetTest.kt
+++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.format.util
+package org.opendc.trace.util.parquet
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
diff --git a/opendc-trace/opendc-trace-swf/build.gradle.kts b/opendc-trace/opendc-trace-swf/build.gradle.kts
new file mode 100644
index 00000000..c9eaa78d
--- /dev/null
+++ b/opendc-trace/opendc-trace-swf/build.gradle.kts
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Support for Standard Workload Format (SWF) traces in OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcTrace.opendcTraceApi)
+}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt
new file mode 100644
index 00000000..12a51a2f
--- /dev/null
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.swf
+
+import org.opendc.trace.*
+import java.nio.file.Path
+import kotlin.io.path.bufferedReader
+
+/**
+ * A [Table] containing the tasks in a SWF trace.
+ */
+internal class SwfTaskTable(private val path: Path) : Table {
+ override val name: String = TABLE_TASKS
+
+ override val isSynthetic: Boolean = false
+
+ override fun isSupported(column: TableColumn<*>): Boolean {
+ return when (column) {
+ TASK_ID -> true
+ TASK_SUBMIT_TIME -> true
+ TASK_WAIT_TIME -> true
+ TASK_RUNTIME -> true
+ TASK_REQ_NCPUS -> true
+ TASK_ALLOC_NCPUS -> true
+ TASK_PARENTS -> true
+ TASK_STATUS -> true
+ TASK_GROUP_ID -> true
+ TASK_USER_ID -> true
+ else -> false
+ }
+ }
+
+ override fun newReader(): TableReader {
+ val reader = path.bufferedReader()
+ return SwfTaskTableReader(reader)
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Invalid partition $partition")
+ }
+
+ override fun toString(): String = "SwfTaskTable"
+}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
new file mode 100644
index 00000000..5f879a54
--- /dev/null
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
@@ -0,0 +1,162 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.swf
+
+import org.opendc.trace.*
+import java.io.BufferedReader
+
+/**
+ * A [TableReader] implementation for the SWF format.
+ */
+internal class SwfTaskTableReader(private val reader: BufferedReader) : TableReader {
+ /**
+ * The current row.
+ */
+ private var fields = emptyList<String>()
+
+ /**
+ * A [Regex] object to match whitespace.
+ */
+ private val whitespace = "\\s+".toRegex()
+
+ override fun nextRow(): Boolean {
+ var line: String
+ var num = 0
+
+ while (true) {
+ line = reader.readLine() ?: return false
+ num++
+
+ if (line.isBlank()) {
+ // Ignore empty lines
+ continue
+ } else if (line.startsWith(";")) {
+ // Ignore comments for now
+ continue
+ }
+
+ break
+ }
+
+ fields = line.trim().split(whitespace)
+
+ if (fields.size < 18) {
+ throw IllegalArgumentException("Invalid format at line $line")
+ }
+
+ return true
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ TASK_ID -> true
+ TASK_SUBMIT_TIME -> true
+ TASK_WAIT_TIME -> true
+ TASK_RUNTIME -> true
+ TASK_REQ_NCPUS -> true
+ TASK_ALLOC_NCPUS -> true
+ TASK_PARENTS -> true
+ TASK_STATUS -> true
+ TASK_GROUP_ID -> true
+ TASK_USER_ID -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any = when (column) {
+ TASK_ID -> getLong(TASK_ID)
+ TASK_SUBMIT_TIME -> getLong(TASK_SUBMIT_TIME)
+ TASK_WAIT_TIME -> getLong(TASK_WAIT_TIME)
+ TASK_RUNTIME -> getLong(TASK_RUNTIME)
+ TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS)
+ TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS)
+ TASK_PARENTS -> {
+ val parent = fields[COL_PARENT_JOB].toLong(10)
+ if (parent < 0) emptySet() else setOf(parent)
+ }
+ TASK_STATUS -> getInt(TASK_STATUS)
+ TASK_GROUP_ID -> getInt(TASK_GROUP_ID)
+ TASK_USER_ID -> getInt(TASK_USER_ID)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ return when (column) {
+ TASK_REQ_NCPUS -> fields[COL_REQ_NCPUS].toInt(10)
+ TASK_ALLOC_NCPUS -> fields[COL_ALLOC_NCPUS].toInt(10)
+ TASK_STATUS -> fields[COL_STATUS].toInt(10)
+ TASK_GROUP_ID -> fields[COL_GROUP_ID].toInt(10)
+ TASK_USER_ID -> fields[COL_USER_ID].toInt(10)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ return when (column) {
+ TASK_ID -> fields[COL_JOB_ID].toLong(10)
+ TASK_SUBMIT_TIME -> fields[COL_SUBMIT_TIME].toLong(10)
+ TASK_WAIT_TIME -> fields[COL_WAIT_TIME].toLong(10)
+ TASK_RUNTIME -> fields[COL_RUN_TIME].toLong(10)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ /**
+ * Default column indices for the SWF format.
+ */
+ private val COL_JOB_ID = 0
+ private val COL_SUBMIT_TIME = 1
+ private val COL_WAIT_TIME = 2
+ private val COL_RUN_TIME = 3
+ private val COL_ALLOC_NCPUS = 4
+ private val COL_AVG_CPU_TIME = 5
+ private val COL_USED_MEM = 6
+ private val COL_REQ_NCPUS = 7
+ private val COL_REQ_TIME = 8
+ private val COL_REQ_MEM = 9
+ private val COL_STATUS = 10
+ private val COL_USER_ID = 11
+ private val COL_GROUP_ID = 12
+ private val COL_EXEC_NUM = 13
+ private val COL_QUEUE_NUM = 14
+ private val COL_PART_NUM = 15
+ private val COL_PARENT_JOB = 16
+ private val COL_PARENT_THINK_TIME = 17
+}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt
new file mode 100644
index 00000000..d4da735e
--- /dev/null
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.swf
+
+import org.opendc.trace.TABLE_TASKS
+import org.opendc.trace.Table
+import org.opendc.trace.Trace
+import java.nio.file.Path
+
+/**
+ * [Trace] implementation for the SWF format.
+ */
+public class SwfTrace internal constructor(private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_TASKS)
+
+ override fun containsTable(name: String): Boolean = TABLE_TASKS == name
+
+ override fun getTable(name: String): Table? {
+ if (!containsTable(name)) {
+ return null
+ }
+ return SwfTaskTable(path)
+ }
+
+ override fun toString(): String = "SwfTrace[$path]"
+}
diff --git a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
index 31ae03e0..36c3122e 100644
--- a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
@@ -20,28 +20,24 @@
* SOFTWARE.
*/
-package org.opendc.format.trace.wtf
+package org.opendc.trace.swf
-import org.junit.jupiter.api.Assertions.*
-import org.junit.jupiter.api.Test
-import java.io.File
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
/**
- * Test suite for the [WtfTraceReader] class.
+ * Support for the Standard Workload Format (SWF) in OpenDC.
+ *
+ * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html
*/
-class WtfTraceReaderTest {
- /**
- * Smoke test for parsing WTF traces.
- */
- @Test
- fun testParseWtf() {
- val reader = WtfTraceReader(File("src/test/resources/wtf-trace"))
- var entry = reader.next()
- assertEquals(0, entry.start)
- assertEquals(23, entry.workload.tasks.size)
+public class SwfTraceFormat : TraceFormat {
+ override val name: String = "swf"
- entry = reader.next()
- assertEquals(333387, entry.start)
- assertEquals(23, entry.workload.tasks.size)
+ override fun open(url: URL): SwfTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return SwfTrace(path)
}
}
diff --git a/opendc-trace/opendc-trace-swf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-swf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
new file mode 100644
index 00000000..6c6b0eb2
--- /dev/null
+++ b/opendc-trace/opendc-trace-swf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
@@ -0,0 +1 @@
+org.opendc.trace.swf.SwfTraceFormat
diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
new file mode 100644
index 00000000..9686891b
--- /dev/null
+++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.swf
+
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.*
+import org.opendc.trace.TABLE_TASKS
+import org.opendc.trace.TASK_ALLOC_NCPUS
+import org.opendc.trace.TASK_ID
+import java.net.URL
+
+/**
+ * Test suite for the [SwfTraceFormat] class.
+ */
+internal class SwfTraceFormatTest {
+ @Test
+ fun testTraceExists() {
+ val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
+ val format = SwfTraceFormat()
+ assertDoesNotThrow {
+ format.open(input)
+ }
+ }
+
+ @Test
+ fun testTraceDoesNotExists() {
+ val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
+ val format = SwfTraceFormat()
+ assertThrows<IllegalArgumentException> {
+ format.open(URL(input.toString() + "help"))
+ }
+ }
+
+ @Test
+ fun testTables() {
+ val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
+ val trace = SwfTraceFormat().open(input)
+
+ assertEquals(listOf(TABLE_TASKS), trace.tables)
+ }
+
+ @Test
+ fun testTableExists() {
+ val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
+ val table = SwfTraceFormat().open(input).getTable(TABLE_TASKS)
+
+ assertNotNull(table)
+ assertDoesNotThrow { table!!.newReader() }
+ }
+
+ @Test
+ fun testTableDoesNotExist() {
+ val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
+ val trace = SwfTraceFormat().open(input)
+
+ assertFalse(trace.containsTable("test"))
+ assertNull(trace.getTable("test"))
+ }
+
+ @Test
+ fun testReader() {
+ val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
+ val trace = SwfTraceFormat().open(input)
+ val reader = trace.getTable(TABLE_TASKS)!!.newReader()
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals(1, reader.getLong(TASK_ID)) },
+ { assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) },
+ { assertTrue(reader.nextRow()) },
+ { assertEquals(2, reader.getLong(TASK_ID)) },
+ { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) },
+ )
+
+ reader.close()
+ }
+
+ @Test
+ fun testReaderPartition() {
+ val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
+ val trace = SwfTraceFormat().open(input)
+
+ assertThrows<IllegalArgumentException> {
+ trace.getTable(TABLE_TASKS)!!.newReader("test")
+ }
+ }
+}
diff --git a/opendc-format/src/test/resources/swf_trace.txt b/opendc-trace/opendc-trace-swf/src/test/resources/trace.swf
index c3ecf890..c3ecf890 100644
--- a/opendc-format/src/test/resources/swf_trace.txt
+++ b/opendc-trace/opendc-trace-swf/src/test/resources/trace.swf
diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts
new file mode 100644
index 00000000..5051c7b0
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/build.gradle.kts
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Support for Workflow Trace Format (WTF) traces in OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcTrace.opendcTraceApi)
+
+ implementation(projects.opendcTrace.opendcTraceParquet)
+}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt
new file mode 100644
index 00000000..be26f540
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.nio.file.Path
+
+/**
+ * A [Table] containing the tasks in a GWF trace.
+ */
+internal class WtfTaskTable(private val path: Path) : Table {
+ override val name: String = TABLE_TASKS
+
+ override val isSynthetic: Boolean = false
+
+ override fun isSupported(column: TableColumn<*>): Boolean {
+ return when (column) {
+ TASK_ID -> true
+ TASK_WORKFLOW_ID -> true
+ TASK_SUBMIT_TIME -> true
+ TASK_WAIT_TIME -> true
+ TASK_RUNTIME -> true
+ TASK_REQ_NCPUS -> true
+ TASK_PARENTS -> true
+ TASK_CHILDREN -> true
+ TASK_GROUP_ID -> true
+ TASK_USER_ID -> true
+ else -> false
+ }
+ }
+
+ override fun newReader(): TableReader {
+ val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0"))
+ return WtfTaskTableReader(reader)
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Invalid partition $partition")
+ }
+
+ override fun toString(): String = "WtfTaskTable"
+}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
new file mode 100644
index 00000000..b6789542
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+
+/**
+ * A [TableReader] implementation for the WTF format.
+ */
+internal class WtfTaskTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+ /**
+ * The current record.
+ */
+ private var record: GenericRecord? = null
+
+ override fun nextRow(): Boolean {
+ record = reader.read()
+ return record != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ TASK_ID -> true
+ TASK_WORKFLOW_ID -> true
+ TASK_SUBMIT_TIME -> true
+ TASK_WAIT_TIME -> true
+ TASK_RUNTIME -> true
+ TASK_REQ_NCPUS -> true
+ TASK_PARENTS -> true
+ TASK_CHILDREN -> true
+ TASK_GROUP_ID -> true
+ TASK_USER_ID -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ @Suppress("UNCHECKED_CAST")
+ val res: Any = when (column) {
+ TASK_ID -> record["id"]
+ TASK_WORKFLOW_ID -> record["workflow_id"]
+ TASK_SUBMIT_TIME -> record["ts_submit"]
+ TASK_WAIT_TIME -> record["wait_time"]
+ TASK_RUNTIME -> record["runtime"]
+ TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt()
+ TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet()
+ TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet()
+ TASK_GROUP_ID -> record["group_id"]
+ TASK_USER_ID -> record["user_id"]
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (column) {
+ TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt()
+ TASK_GROUP_ID -> record["group_id"] as Int
+ TASK_USER_ID -> record["user_id"] as Int
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (column) {
+ TASK_ID -> record["id"] as Long
+ TASK_WORKFLOW_ID -> record["workflow_id"] as Long
+ TASK_SUBMIT_TIME -> record["ts_submit"] as Long
+ TASK_WAIT_TIME -> record["wait_time"] as Long
+ TASK_RUNTIME -> record["runtime"] as Long
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ reader.close()
+ }
+}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt
new file mode 100644
index 00000000..7eff0f5a
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf
+
+import org.opendc.trace.TABLE_TASKS
+import org.opendc.trace.Table
+import org.opendc.trace.Trace
+import java.nio.file.Path
+
+/**
+ * [Trace] implementation for the WTF format.
+ */
+public class WtfTrace internal constructor(private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_TASKS)
+
+ override fun containsTable(name: String): Boolean = TABLE_TASKS == name
+
+ override fun getTable(name: String): Table? {
+ if (!containsTable(name)) {
+ return null
+ }
+
+ return WtfTaskTable(path)
+ }
+
+ override fun toString(): String = "SwfTrace[$path]"
+}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
new file mode 100644
index 00000000..781cb335
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf
+
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A [TraceFormat] implementation for the Workflow Trace Format (WTF).
+ */
+public class WtfTraceFormat : TraceFormat {
+ override val name: String = "wtf"
+
+ override fun open(url: URL): WtfTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return WtfTrace(path)
+ }
+}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
new file mode 100644
index 00000000..32da52ff
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
@@ -0,0 +1 @@
+org.opendc.trace.wtf.WtfTraceFormat
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
new file mode 100644
index 00000000..a05a523e
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.trace.*
+import java.io.File
+import java.net.URL
+
+/**
+ * Test suite for the [WtfTraceFormat] class.
+ */
+class WtfTraceFormatTest {
+ @Test
+ fun testTraceExists() {
+ val input = File("src/test/resources/wtf-trace").toURI().toURL()
+ val format = WtfTraceFormat()
+ org.junit.jupiter.api.assertDoesNotThrow {
+ format.open(input)
+ }
+ }
+
+ @Test
+ fun testTraceDoesNotExists() {
+ val input = File("src/test/resources/wtf-trace").toURI().toURL()
+ val format = WtfTraceFormat()
+ assertThrows<IllegalArgumentException> {
+ format.open(URL(input.toString() + "help"))
+ }
+ }
+
+ @Test
+ fun testTables() {
+ val input = File("src/test/resources/wtf-trace").toURI().toURL()
+ val format = WtfTraceFormat()
+ val trace = format.open(input)
+
+ assertEquals(listOf(TABLE_TASKS), trace.tables)
+ }
+
+ @Test
+ fun testTableExists() {
+ val input = File("src/test/resources/wtf-trace").toURI().toURL()
+ val format = WtfTraceFormat()
+ val table = format.open(input).getTable(TABLE_TASKS)
+
+ assertNotNull(table)
+ org.junit.jupiter.api.assertDoesNotThrow { table!!.newReader() }
+ }
+
+ @Test
+ fun testTableDoesNotExist() {
+ val input = File("src/test/resources/wtf-trace").toURI().toURL()
+ val format = WtfTraceFormat()
+ val trace = format.open(input)
+
+ assertFalse(trace.containsTable("test"))
+ assertNull(trace.getTable("test"))
+ }
+
+ /**
+ * Smoke test for parsing WTF traces.
+ */
+ @Test
+ fun testTableReader() {
+ val input = File("src/test/resources/wtf-trace")
+ val trace = WtfTraceFormat().open(input.toURI().toURL())
+ val reader = trace.getTable(TABLE_TASKS)!!.newReader()
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals(362334516345962206, reader.getLong(TASK_ID)) },
+ { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) },
+ { assertEquals(245604, reader.getLong(TASK_SUBMIT_TIME)) },
+ { assertEquals(8163, reader.getLong(TASK_RUNTIME)) },
+ { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) },
+ )
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals(502010169100446658, reader.getLong(TASK_ID)) },
+ { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) },
+ { assertEquals(251325, reader.getLong(TASK_SUBMIT_TIME)) },
+ { assertEquals(8216, reader.getLong(TASK_RUNTIME)) },
+ { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) },
+ )
+
+ reader.close()
+ }
+
+ @Test
+ fun testTableReaderPartition() {
+ val input = File("src/test/resources/wtf-trace").toURI().toURL()
+ val format = WtfTraceFormat()
+ val table = format.open(input).getTable(TABLE_TASKS)!!
+
+ assertThrows<IllegalArgumentException> { table.newReader("test") }
+ }
+}
diff --git a/opendc-format/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet b/opendc-trace/opendc-trace-wtf/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet
index d2044038..d2044038 100644
--- a/opendc-format/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet
+++ b/opendc-trace/opendc-trace-wtf/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet
Binary files differ
diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts
index 1f705b79..ec4a4673 100644
--- a/opendc-web/opendc-web-runner/build.gradle.kts
+++ b/opendc-web/opendc-web-runner/build.gradle.kts
@@ -36,7 +36,6 @@ application {
dependencies {
api(platform(projects.opendcPlatform))
implementation(projects.opendcCompute.opendcComputeSimulator)
- implementation(projects.opendcFormat)
implementation(projects.opendcExperiments.opendcExperimentsCapelin)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index c5f5cd03..53d50357 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -34,12 +34,12 @@ import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
import org.opendc.compute.service.scheduler.weights.*
import org.opendc.experiments.capelin.*
+import org.opendc.experiments.capelin.env.EnvironmentReader
+import org.opendc.experiments.capelin.env.MachineDef
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts
index bc082dbc..941202d2 100644
--- a/opendc-workflow/opendc-workflow-service/build.gradle.kts
+++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts
@@ -39,11 +39,7 @@ dependencies {
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testImplementation(projects.opendcCompute.opendcComputeSimulator)
- testImplementation(projects.opendcFormat)
+ testImplementation(projects.opendcTrace.opendcTraceGwf)
testImplementation(projects.opendcTelemetry.opendcTelemetrySdk)
- testImplementation(libs.jackson.module.kotlin) {
- exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect")
- }
- testImplementation(kotlin("reflect"))
testRuntimeOnly(libs.log4j.slf4j)
}
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt
new file mode 100644
index 00000000..a390fe08
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service
+
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.trace.*
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.api.WORKFLOW_TASK_CORES
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import java.time.Clock
+import java.util.*
+import kotlin.collections.HashMap
+import kotlin.collections.HashSet
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * Helper tool to replay workflow trace.
+ */
+internal class TraceReplayer(private val trace: Trace) {
+ /**
+ * Replay the workload.
+ */
+ public suspend fun replay(clock: Clock, service: WorkflowService) {
+ val jobs = parseTrace(trace)
+
+ // Sort jobs by their arrival time
+ (jobs as MutableList<Job>).sortBy { it.metadata["WORKFLOW_SUBMIT_TIME"] as Long }
+
+ // Wait until the trace is started
+ val startTime = jobs[0].metadata["WORKFLOW_SUBMIT_TIME"] as Long
+ delay(min(0L, startTime - clock.millis()))
+
+ val offset = startTime - clock.millis()
+
+ coroutineScope {
+ for (job in jobs) {
+ val submitTime = job.metadata["WORKFLOW_SUBMIT_TIME"] as Long
+ delay(max(0, (submitTime - offset) - clock.millis()))
+
+ launch { service.run(job) }
+ }
+ }
+ }
+
+ /**
+ * Convert [trace] into a list of [Job]s that can be submitted to the workflow service.
+ */
+ public fun parseTrace(trace: Trace): List<Job> {
+ val table = checkNotNull(trace.getTable(TABLE_TASKS))
+ val reader = table.newReader()
+
+ val jobs = mutableMapOf<Long, Job>()
+ val tasks = mutableMapOf<Long, Task>()
+ val taskDependencies = mutableMapOf<Task, Set<Long>>()
+
+ try {
+ while (reader.nextRow()) {
+ // Bag of tasks without workflow ID all share the same workflow
+ val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.getLong(TASK_WORKFLOW_ID) else 0L
+ val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) }
+
+ val id = reader.getLong(TASK_ID)
+ val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS))
+ reader.getInt(TASK_ALLOC_NCPUS)
+ else
+ reader.getInt(TASK_REQ_NCPUS)
+ val submitTime = reader.getLong(TASK_SUBMIT_TIME)
+ val runtime = reader.getLong(TASK_RUNTIME)
+ val flops: Long = 4000 * runtime * grantedCpus
+ val workload = SimFlopsWorkload(flops)
+ val task = Task(
+ UUID(0L, id),
+ "<unnamed>",
+ HashSet(),
+ mapOf(
+ "workload" to workload,
+ WORKFLOW_TASK_CORES to grantedCpus,
+ WORKFLOW_TASK_DEADLINE to (runtime * 1000)
+ ),
+ )
+
+ tasks[id] = task
+ taskDependencies[task] = reader.get(TASK_PARENTS)
+
+ (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime) { a, b -> min(a as Long, b as Long) }
+ (workflow.tasks as MutableSet<Task>).add(task)
+ }
+
+ // Resolve dependencies for all tasks
+ for ((task, deps) in taskDependencies) {
+ for (dep in deps) {
+ val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" }
+ (task.dependencies as MutableSet<Task>).add(parent)
+ }
+ }
+ } finally {
+ reader.close()
+ }
+
+ return jobs.values.toList()
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index d82959e7..07433d1f 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -25,9 +25,6 @@ package org.opendc.workflow.service
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
@@ -39,25 +36,28 @@ import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.simulator.SimHost
-import org.opendc.format.environment.sc18.Sc18EnvironmentReader
-import org.opendc.format.trace.gwf.GwfTraceReader
import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.telemetry.sdk.toOtelClock
+import org.opendc.trace.gwf.GwfTraceFormat
import org.opendc.workflow.service.internal.WorkflowServiceImpl
import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
-import kotlin.math.max
+import java.util.*
/**
* Integration test suite for the [WorkflowServiceImpl].
*/
@DisplayName("WorkflowService")
-internal class WorkflowServiceIntegrationTest {
+internal class WorkflowServiceTest {
/**
* A large integration test where we check whether all tasks in some trace are executed correctly.
*/
@@ -69,20 +69,20 @@ internal class WorkflowServiceIntegrationTest {
.build()
val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val hosts = Sc18EnvironmentReader(checkNotNull(object {}.javaClass.getResourceAsStream("/environment.json")))
- .use { it.read() }
- .map { def ->
- SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- coroutineContext,
- interpreter,
- MeterProvider.noop().get("opendc-compute-simulator"),
- SimSpaceSharedHypervisorProvider()
- )
- }
+ val machineModel = createMachineModel()
+ val hvProvider = SimSpaceSharedHypervisorProvider()
+ val hosts = List(4) { id ->
+ SimHost(
+ UUID(0, id.toLong()),
+ "node-$id",
+ machineModel,
+ emptyMap(),
+ coroutineContext,
+ interpreter,
+ meterProvider.get("opendc-compute-simulator"),
+ hvProvider,
+ )
+ }
val meter = MeterProvider.noop().get("opendc-compute")
val computeScheduler = FilterScheduler(
@@ -105,23 +105,10 @@ internal class WorkflowServiceIntegrationTest {
taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
)
- val reader = GwfTraceReader(checkNotNull(object {}.javaClass.getResourceAsStream("/trace.gwf")))
- var offset = Long.MIN_VALUE
-
- coroutineScope {
- while (reader.hasNext()) {
- val entry = reader.next()
+ val trace = GwfTraceFormat().open(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")))
+ val replayer = TraceReplayer(trace)
- if (offset < 0) {
- offset = entry.start - clock.millis()
- }
-
- delay(max(0, (entry.start - offset) - clock.millis()))
- launch {
- scheduler.run(entry.workload)
- }
- }
- }
+ replayer.replay(clock, scheduler)
hosts.forEach(SimHost::close)
scheduler.close()
@@ -134,10 +121,22 @@ internal class WorkflowServiceIntegrationTest {
{ assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") },
{ assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") },
{ assertEquals(0, metrics.tasksActive, "Not all started tasks finished") },
- { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }
+ { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
+ { assertEquals(33213237L, clock.millis()) }
)
}
+ /**
+ * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html
+ */
+ private fun createMachineModel(): MachineModel {
+ val node = ProcessingNode("AMD", "am64", "EPYC 7742", 32)
+ val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) }
+ val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) }
+
+ return MachineModel(cpus, memory)
+ }
+
class WorkflowMetrics {
var jobsSubmitted = 0L
var jobsActive = 0L
diff --git a/settings.gradle.kts b/settings.gradle.kts
index d85c08f8..5cae0a31 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -30,7 +30,6 @@ include(":opendc-workflow:opendc-workflow-service")
include(":opendc-faas:opendc-faas-api")
include(":opendc-faas:opendc-faas-service")
include(":opendc-faas:opendc-faas-simulator")
-include(":opendc-format")
include(":opendc-experiments:opendc-experiments-capelin")
include(":opendc-experiments:opendc-experiments-energy21")
include(":opendc-experiments:opendc-experiments-serverless20")
@@ -46,6 +45,12 @@ include(":opendc-simulator:opendc-simulator-compute")
include(":opendc-simulator:opendc-simulator-failures")
include(":opendc-telemetry:opendc-telemetry-api")
include(":opendc-telemetry:opendc-telemetry-sdk")
+include(":opendc-trace:opendc-trace-api")
+include(":opendc-trace:opendc-trace-gwf")
+include(":opendc-trace:opendc-trace-swf")
+include(":opendc-trace:opendc-trace-wtf")
+include(":opendc-trace:opendc-trace-bitbrains")
+include(":opendc-trace:opendc-trace-parquet")
include(":opendc-harness:opendc-harness-api")
include(":opendc-harness:opendc-harness-engine")
include(":opendc-harness:opendc-harness-cli")