summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-workload
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-17 17:48:02 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-19 14:02:26 +0200
commit859ce303f0b9110c7110b918e5957c2156fa8b26 (patch)
tree67b19ac5bd807f4b7b8d67810448630f542017a9 /opendc-compute/opendc-compute-workload
parente26b81568db1b08c87dd43d416e129e32d5de26b (diff)
refactor(capelin): Extract common code out of Capelin experiments
This change creates a new module for doing simulations with virtual machine workloads. We have found that a lot of code in the Capelin experiments code is being re-used by non-experiment modules.
Diffstat (limited to 'opendc-compute/opendc-compute-workload')
-rw-r--r--opendc-compute/opendc-compute-workload/build.gradle.kts49
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt222
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt38
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt69
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/env/MachineDef.kt38
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt145
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt67
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt104
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt97
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt66
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt139
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt261
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt260
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt42
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt32
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTable.kt127
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTableReader.kt149
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTable.kt54
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTableReader.kt168
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTrace.kt46
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTraceFormat.kt56
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTable.kt53
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTableReader.kt103
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTable.kt53
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTableReader.kt103
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTrace.kt49
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTraceFormat.kt47
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/Schemas.kt55
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTable.kt138
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTableReader.kt212
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTrace.kt45
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTraceFormat.kt47
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt68
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt45
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json22
35 files changed, 3269 insertions, 0 deletions
diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts
new file mode 100644
index 00000000..390c7455
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/build.gradle.kts
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Support library for simulating VM-based workloads with OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcCompute.opendcComputeSimulator)
+
+ implementation(projects.opendcTrace.opendcTraceParquet)
+ implementation(projects.opendcTrace.opendcTraceBitbrains)
+ implementation(projects.opendcSimulator.opendcSimulatorCore)
+ implementation(projects.opendcSimulator.opendcSimulatorCompute)
+ implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(projects.opendcTelemetry.opendcTelemetryCompute)
+ implementation(libs.opentelemetry.semconv)
+
+ implementation(libs.kotlin.logging)
+ implementation(libs.clikt)
+ implementation(libs.jackson.databind)
+ implementation(libs.jackson.module.kotlin)
+ implementation(libs.jackson.dataformat.csv)
+ implementation(kotlin("reflect"))
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
new file mode 100644
index 00000000..cc9f2705
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload
+
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.resources.Resource
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.workload.env.MachineDef
+import org.opendc.compute.workload.trace.TraceReader
+import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.kernel.SimHypervisorProvider
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.telemetry.compute.*
+import org.opendc.telemetry.sdk.toOtelClock
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.max
+
+/**
+ * Helper class to simulated VM-based workloads in OpenDC.
+ */
+public class ComputeWorkloadRunner(
+ private val context: CoroutineContext,
+ private val clock: Clock,
+ scheduler: ComputeScheduler,
+ machines: List<MachineDef>,
+ private val failureModel: FailureModel? = null,
+ interferenceModel: VmInterferenceModel? = null,
+ hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider()
+) : AutoCloseable {
+ /**
+ * The [ComputeService] that has been configured by the manager.
+ */
+ public val service: ComputeService
+
+ /**
+ * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts.
+ */
+ public val producers: List<MetricProducer>
+ get() = _metricProducers
+ private val _metricProducers = mutableListOf<MetricProducer>()
+
+ /**
+ * The [SimResourceInterpreter] to simulate the hosts.
+ */
+ private val interpreter = SimResourceInterpreter(context, clock)
+
+ /**
+ * The hosts that belong to this class.
+ */
+ private val hosts = mutableSetOf<SimHost>()
+
+ init {
+ val (service, serviceMeterProvider) = createService(scheduler)
+ this._metricProducers.add(serviceMeterProvider)
+ this.service = service
+
+ for (def in machines) {
+ val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel)
+ this._metricProducers.add(hostMeterProvider)
+ hosts.add(host)
+ this.service.addHost(host)
+ }
+ }
+
+ /**
+ * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader].
+ */
+ public suspend fun run(reader: TraceReader<SimWorkload>) {
+ val injector = failureModel?.createInjector(context, clock, service)
+ val client = service.newClient()
+
+ // Create new image for the virtual machine
+ val image = client.newImage("vm-image")
+
+ try {
+ coroutineScope {
+ // Start the fault injector
+ injector?.start()
+
+ var offset = Long.MIN_VALUE
+
+ while (reader.hasNext()) {
+ val entry = reader.next()
+
+ if (offset < 0) {
+ offset = entry.start - clock.millis()
+ }
+
+ // Make sure the trace entries are ordered by submission time
+ assert(entry.start - offset >= 0) { "Invalid trace order" }
+ delay(max(0, (entry.start - offset) - clock.millis()))
+
+ launch {
+ val workloadOffset = -offset + 300001
+ val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
+
+ val server = client.newServer(
+ entry.name,
+ image,
+ client.newFlavor(
+ entry.name,
+ entry.meta["cores"] as Int,
+ entry.meta["required-memory"] as Long
+ ),
+ meta = entry.meta + mapOf("workload" to workload)
+ )
+
+ // Wait for the server reach its end time
+ val endTime = entry.meta["end-time"] as Long
+ delay(endTime + workloadOffset - clock.millis() + 1)
+
+ // Delete the server after reaching the end-time of the virtual machine
+ server.delete()
+ }
+ }
+ }
+
+ yield()
+ } finally {
+ injector?.close()
+ reader.close()
+ client.close()
+ }
+ }
+
+ override fun close() {
+ service.close()
+
+ for (host in hosts) {
+ host.close()
+ }
+
+ hosts.clear()
+ }
+
+ /**
+ * Construct a [ComputeService] instance.
+ */
+ private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> {
+ val resource = Resource.builder()
+ .put(ResourceAttributes.SERVICE_NAME, "opendc-compute")
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+
+ val service = ComputeService(context, clock, meterProvider, scheduler)
+ return service to meterProvider
+ }
+
+ /**
+ * Construct a [SimHost] instance for the specified [MachineDef].
+ */
+ private fun createHost(
+ def: MachineDef,
+ hypervisorProvider: SimHypervisorProvider,
+ interferenceModel: VmInterferenceModel? = null
+ ): Pair<SimHost, SdkMeterProvider> {
+ val resource = Resource.builder()
+ .put(HOST_ID, def.uid.toString())
+ .put(HOST_NAME, def.name)
+ .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(HOST_NCPUS, def.model.cpus.size)
+ .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size })
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+
+ val host = SimHost(
+ def.uid,
+ def.name,
+ def.model,
+ def.meta,
+ context,
+ interpreter,
+ meterProvider,
+ hypervisorProvider,
+ powerDriver = SimplePowerDriver(def.powerModel),
+ interferenceDomain = interferenceModel?.newDomain()
+ )
+
+ return host to meterProvider
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt
new file mode 100644
index 00000000..43dd8321
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload
+
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts.
+ */
+public interface FailureModel {
+ /**
+ * Construct a [HostFaultInjector] for the specified [service].
+ */
+ public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt
new file mode 100644
index 00000000..55c61be1
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("FailureModels")
+package org.opendc.compute.workload
+
+import org.apache.commons.math3.distribution.LogNormalDistribution
+import org.apache.commons.math3.random.Well19937c
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import org.opendc.compute.simulator.failure.StartStopHostFault
+import org.opendc.compute.simulator.failure.StochasticVictimSelector
+import java.time.Clock
+import java.time.Duration
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.ln
+import kotlin.random.Random
+
+/**
+ * Obtain a [FailureModel] based on the GRID'5000 failure trace.
+ *
+ * This fault injector uses parameters from the GRID'5000 failure trace as described in
+ * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
+ */
+public fun grid5000(failureInterval: Duration, seed: Int): FailureModel {
+ return object : FailureModel {
+ override fun createInjector(
+ context: CoroutineContext,
+ clock: Clock,
+ service: ComputeService
+ ): HostFaultInjector {
+ val rng = Well19937c(seed)
+ val hosts = service.hosts.map { it as SimHost }.toSet()
+
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return HostFaultInjector(
+ context,
+ clock,
+ hosts,
+ iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
+ fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
+ )
+ }
+
+ override fun toString(): String = "Grid5000FailureModel"
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/env/MachineDef.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/env/MachineDef.kt
new file mode 100644
index 00000000..c1695696
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/env/MachineDef.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.env
+
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.power.PowerModel
+import java.util.*
+
+/**
+ * A definition of a machine in a cluster.
+ */
+public data class MachineDef(
+ val uid: UUID,
+ val name: String,
+ val meta: Map<String, Any>,
+ val model: MachineModel,
+ val powerModel: PowerModel
+)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
new file mode 100644
index 00000000..4172d729
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import mu.KotlinLogging
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.trace.util.parquet.LocalOutputFile
+import java.io.File
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import kotlin.concurrent.thread
+
+/**
+ * A writer that writes data in Parquet format.
+ */
+public abstract class ParquetDataWriter<in T>(
+ path: File,
+ private val schema: Schema,
+ bufferSize: Int = 4096
+) : AutoCloseable {
+ /**
+ * The logging instance to use.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The queue of commands to process.
+ */
+ private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
+
+ /**
+ * An exception to be propagated to the actual writer.
+ */
+ private var exception: Throwable? = null
+
+ /**
+ * The thread that is responsible for writing the Parquet records.
+ */
+ private val writerThread = thread(start = false, name = this.toString()) {
+ val writer = let {
+ val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ buildWriter(builder)
+ }
+
+ val queue = queue
+ val buf = mutableListOf<T>()
+ var shouldStop = false
+
+ try {
+ while (!shouldStop) {
+ try {
+ process(writer, queue.take())
+ } catch (e: InterruptedException) {
+ shouldStop = true
+ }
+
+ if (queue.drainTo(buf) > 0) {
+ for (data in buf) {
+ process(writer, data)
+ }
+ buf.clear()
+ }
+ }
+ } catch (e: Throwable) {
+ logger.error(e) { "Failure in Parquet data writer" }
+ exception = e
+ } finally {
+ writer.close()
+ }
+ }
+
+ /**
+ * Build the [ParquetWriter] used to write the Parquet files.
+ */
+ protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder.build()
+ }
+
+ /**
+ * Convert the specified [data] into a Parquet record.
+ */
+ protected abstract fun convert(builder: GenericRecordBuilder, data: T)
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ public fun write(data: T) {
+ val exception = exception
+ if (exception != null) {
+ throw IllegalStateException("Writer thread failed", exception)
+ }
+
+ queue.put(data)
+ }
+
+ /**
+ * Signal the writer to stop.
+ */
+ override fun close() {
+ writerThread.interrupt()
+ writerThread.join()
+ }
+
+ init {
+ writerThread.start()
+ }
+
+ /**
+ * Process the specified [data] to be written to the Parquet file.
+ */
+ private fun process(writer: ParquetWriter<GenericData.Record>, data: T) {
+ val builder = GenericRecordBuilder(schema)
+ convert(builder, data)
+ writer.write(builder.build())
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt
new file mode 100644
index 00000000..f41a2241
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.telemetry.compute.table.ServiceData
+import java.io.File
+
+/**
+ * A [ComputeMonitor] that logs the events to a Parquet file.
+ */
+public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
+ private val serverWriter = ParquetServerDataWriter(
+ File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ private val hostWriter = ParquetHostDataWriter(
+ File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ private val serviceWriter = ParquetServiceDataWriter(
+ File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ override fun record(data: ServerData) {
+ serverWriter.write(data)
+ }
+
+ override fun record(data: HostData) {
+ hostWriter.write(data)
+ }
+
+ override fun record(data: ServiceData) {
+ serviceWriter.write(data)
+ }
+
+ override fun close() {
+ hostWriter.close()
+ serviceWriter.close()
+ serverWriter.close()
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
new file mode 100644
index 00000000..37066a0d
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import org.opendc.trace.util.parquet.UUID_SCHEMA
+import org.opendc.trace.util.parquet.optional
+import java.io.File
+
+/**
+ * A Parquet event writer for [HostData]s.
+ */
+public class ParquetHostDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) {
+
+ override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder
+ .withDictionaryEncoding("host_id", true)
+ .build()
+ }
+
+ override fun convert(builder: GenericRecordBuilder, data: HostData) {
+ builder["timestamp"] = data.timestamp.toEpochMilli()
+
+ builder["host_id"] = data.host.id
+
+ builder["uptime"] = data.uptime
+ builder["downtime"] = data.downtime
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ builder["boot_time"] = bootTime.toEpochMilli()
+ }
+
+ builder["cpu_count"] = data.host.cpuCount
+ builder["cpu_limit"] = data.cpuLimit
+ builder["cpu_time_active"] = data.cpuActiveTime
+ builder["cpu_time_idle"] = data.cpuIdleTime
+ builder["cpu_time_steal"] = data.cpuStealTime
+ builder["cpu_time_lost"] = data.cpuLostTime
+
+ builder["mem_limit"] = data.host.memCapacity
+
+ builder["power_total"] = data.powerTotal
+
+ builder["guests_terminated"] = data.guestsTerminated
+ builder["guests_running"] = data.guestsRunning
+ builder["guests_error"] = data.guestsError
+ builder["guests_invalid"] = data.guestsInvalid
+ }
+
+ override fun toString(): String = "host-writer"
+
+ private companion object {
+ private val SCHEMA: Schema = SchemaBuilder
+ .record("host")
+ .namespace("org.opendc.telemetry.compute")
+ .fields()
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .name("host_id").type(UUID_SCHEMA).noDefault()
+ .requiredLong("uptime")
+ .requiredLong("downtime")
+ .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
+ .requiredInt("cpu_count")
+ .requiredDouble("cpu_limit")
+ .requiredLong("cpu_time_active")
+ .requiredLong("cpu_time_idle")
+ .requiredLong("cpu_time_steal")
+ .requiredLong("cpu_time_lost")
+ .requiredLong("mem_limit")
+ .requiredDouble("power_total")
+ .requiredInt("guests_terminated")
+ .requiredInt("guests_running")
+ .requiredInt("guests_error")
+ .requiredInt("guests_invalid")
+ .endRecord()
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
new file mode 100644
index 00000000..bea23d32
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import org.opendc.trace.util.parquet.UUID_SCHEMA
+import org.opendc.trace.util.parquet.optional
+import java.io.File
+
+/**
+ * A Parquet event writer for [ServerData]s.
+ */
+public class ParquetServerDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) {
+
+ override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder
+ .withDictionaryEncoding("server_id", true)
+ .withDictionaryEncoding("host_id", true)
+ .build()
+ }
+
+ override fun convert(builder: GenericRecordBuilder, data: ServerData) {
+ builder["timestamp"] = data.timestamp.toEpochMilli()
+
+ builder["server_id"] = data.server.id
+ builder["host_id"] = data.host?.id
+
+ builder["uptime"] = data.uptime
+ builder["downtime"] = data.downtime
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ builder["boot_time"] = bootTime.toEpochMilli()
+ }
+ builder["scheduling_latency"] = data.schedulingLatency
+
+ builder["cpu_count"] = data.server.cpuCount
+ builder["cpu_limit"] = data.cpuLimit
+ builder["cpu_time_active"] = data.cpuActiveTime
+ builder["cpu_time_idle"] = data.cpuIdleTime
+ builder["cpu_time_steal"] = data.cpuStealTime
+ builder["cpu_time_lost"] = data.cpuLostTime
+
+ builder["mem_limit"] = data.server.memCapacity
+ }
+
+ override fun toString(): String = "server-writer"
+
+ private companion object {
+ private val SCHEMA: Schema = SchemaBuilder
+ .record("server")
+ .namespace("org.opendc.telemetry.compute")
+ .fields()
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .name("server_id").type(UUID_SCHEMA).noDefault()
+ .name("host_id").type(UUID_SCHEMA.optional()).noDefault()
+ .requiredLong("uptime")
+ .requiredLong("downtime")
+ .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
+ .requiredLong("scheduling_latency")
+ .requiredInt("cpu_count")
+ .requiredDouble("cpu_limit")
+ .requiredLong("cpu_time_active")
+ .requiredLong("cpu_time_idle")
+ .requiredLong("cpu_time_steal")
+ .requiredLong("cpu_time_lost")
+ .requiredLong("mem_limit")
+ .endRecord()
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
new file mode 100644
index 00000000..47824b29
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericRecordBuilder
+import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import java.io.File
+
+/**
+ * A Parquet event writer for [ServiceData]s.
+ */
+public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) {
+
+ override fun convert(builder: GenericRecordBuilder, data: ServiceData) {
+ builder["timestamp"] = data.timestamp.toEpochMilli()
+ builder["hosts_up"] = data.hostsUp
+ builder["hosts_down"] = data.hostsDown
+ builder["servers_pending"] = data.serversPending
+ builder["servers_active"] = data.serversActive
+ builder["attempts_success"] = data.attemptsSuccess
+ builder["attempts_failure"] = data.attemptsFailure
+ builder["attempts_error"] = data.attemptsError
+ }
+
+ override fun toString(): String = "service-writer"
+
+ private companion object {
+ private val SCHEMA: Schema = SchemaBuilder
+ .record("service")
+ .namespace("org.opendc.telemetry.compute")
+ .fields()
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .requiredInt("hosts_up")
+ .requiredInt("hosts_down")
+ .requiredInt("servers_pending")
+ .requiredInt("servers_active")
+ .requiredInt("attempts_success")
+ .requiredInt("attempts_failure")
+ .requiredInt("attempts_error")
+ .endRecord()
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt
new file mode 100644
index 00000000..ae20482d
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace
+
+import org.opendc.compute.workload.trace.bp.BPTraceFormat
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.trace.*
+import java.io.File
+import java.util.UUID
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param path The directory of the traces.
+ */
+public class RawParquetTraceReader(private val path: File) {
+ /**
+ * The [Trace] that represents this trace.
+ */
+ private val trace = BPTraceFormat().open(path.toURI().toURL())
+
+ /**
+ * Read the fragments into memory.
+ */
+ private fun parseFragments(): Map<String, List<SimTraceWorkload.Fragment>> {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
+
+ val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
+
+ return try {
+ while (reader.nextRow()) {
+ val id = reader.get(RESOURCE_STATE_ID)
+ val time = reader.get(RESOURCE_STATE_TIMESTAMP)
+ val duration = reader.get(RESOURCE_STATE_DURATION)
+ val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
+
+ val fragment = SimTraceWorkload.Fragment(
+ time.toEpochMilli(),
+ duration.toMillis(),
+ cpuUsage,
+ cores
+ )
+
+ fragments.getOrPut(id) { mutableListOf() }.add(fragment)
+ }
+
+ fragments
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Read the metadata into a workload.
+ */
+ private fun parseMeta(fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
+
+ var counter = 0
+ val entries = mutableListOf<TraceEntry<SimWorkload>>()
+
+ return try {
+ while (reader.nextRow()) {
+
+ val id = reader.get(RESOURCE_ID)
+ if (!fragments.containsKey(id)) {
+ continue
+ }
+
+ val submissionTime = reader.get(RESOURCE_START_TIME)
+ val endTime = reader.get(RESOURCE_STOP_TIME)
+ val maxCores = reader.getInt(RESOURCE_NCPUS)
+ val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB
+ val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
+
+ val vmFragments = fragments.getValue(id).asSequence()
+ val totalLoad = vmFragments.sumOf { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
+ val workload = SimTraceWorkload(vmFragments)
+ entries.add(
+ TraceEntry(
+ uid, id, submissionTime.toEpochMilli(), workload,
+ mapOf(
+ "submit-time" to submissionTime.toEpochMilli(),
+ "end-time" to endTime.toEpochMilli(),
+ "total-load" to totalLoad,
+ "cores" to maxCores,
+ "required-memory" to requiredMemory.toLong(),
+ "workload" to workload
+ )
+ )
+ )
+ }
+
+ entries
+ } catch (e: Exception) {
+ e.printStackTrace()
+ throw e
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * The entries in the trace.
+ */
+ private val entries: List<TraceEntry<SimWorkload>>
+
+ init {
+ val fragments = parseFragments()
+ entries = parseMeta(fragments)
+ }
+
+ /**
+ * Read the entries in the trace.
+ */
+ public fun read(): List<TraceEntry<SimWorkload>> = entries
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt
new file mode 100644
index 00000000..36cd0a7d
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt
@@ -0,0 +1,261 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace
+
+import mu.KotlinLogging
+import org.apache.avro.generic.GenericData
+import org.apache.parquet.avro.AvroParquetReader
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.filter2.predicate.Statistics
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate
+import org.apache.parquet.io.api.Binary
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.trace.util.parquet.LocalInputFile
+import java.io.File
+import java.io.Serializable
+import java.util.SortedSet
+import java.util.TreeSet
+import java.util.UUID
+import java.util.concurrent.ArrayBlockingQueue
+import kotlin.concurrent.thread
+
+/**
+ * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
+ *
+ * @param traceFile The directory of the traces.
+ * @param selectedVms The list of VMs to read from the trace.
+ */
+public class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> {
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The internal iterator to use for this reader.
+ */
+ private val iterator: Iterator<TraceEntry<SimWorkload>>
+
+ /**
+ * The intermediate buffer to store the read records in.
+ */
+ private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024)
+
+ /**
+ * An optional filter for filtering the selected VMs
+ */
+ private val filter =
+ if (selectedVms.isEmpty())
+ null
+ else
+ FilterCompat.get(
+ FilterApi.userDefined(
+ FilterApi.binaryColumn("id"),
+ SelectedVmFilter(
+ TreeSet(selectedVms)
+ )
+ )
+ )
+
+ /**
+ * A poisonous fragment.
+ */
+ private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0))
+
+ /**
+ * The thread to read the records in.
+ */
+ private val readerThread = thread(start = true, name = "sc20-reader") {
+ val reader = AvroParquetReader
+ .builder<GenericData.Record>(LocalInputFile(File(traceFile, "trace.parquet")))
+ .disableCompatibility()
+ .withFilter(filter)
+ .build()
+
+ try {
+ while (true) {
+ val record = reader.read()
+
+ if (record == null) {
+ queue.put(poison)
+ break
+ }
+
+ val id = record["id"].toString()
+ val time = record["time"] as Long
+ val duration = record["duration"] as Long
+ val cores = record["cores"] as Int
+ val cpuUsage = record["cpuUsage"] as Double
+
+ val fragment = SimTraceWorkload.Fragment(
+ time,
+ duration,
+ cpuUsage,
+ cores
+ )
+
+ queue.put(id to fragment)
+ }
+ } catch (e: InterruptedException) {
+ // Do not rethrow this
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Fill the buffers with the VMs
+ */
+ private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) {
+ if (!hasNext) {
+ return
+ }
+
+ val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>()
+ queue.drainTo(fragments)
+
+ for ((id, fragment) in fragments) {
+ if (id == poison.first) {
+ hasNext = false
+ return
+ }
+ buffers[id]?.forEach { it.add(fragment) }
+ }
+ }
+
+ /**
+ * A flag to indicate whether the reader has more entries.
+ */
+ private var hasNext: Boolean = true
+
+ /**
+ * Initialize the reader.
+ */
+ init {
+ val takenIds = mutableSetOf<UUID>()
+ val entries = mutableMapOf<String, GenericData.Record>()
+ val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
+
+ val metaReader = AvroParquetReader
+ .builder<GenericData.Record>(LocalInputFile(File(traceFile, "meta.parquet")))
+ .disableCompatibility()
+ .withFilter(filter)
+ .build()
+
+ while (true) {
+ val record = metaReader.read() ?: break
+ val id = record["id"].toString()
+ entries[id] = record
+ }
+
+ metaReader.close()
+
+ val selection = selectedVms.ifEmpty { entries.keys }
+
+ // Create the entry iterator
+ iterator = selection.asSequence()
+ .mapNotNull { entries[it] }
+ .mapIndexed { index, record ->
+ val id = record["id"].toString()
+ val submissionTime = record["submissionTime"] as Long
+ val endTime = record["endTime"] as Long
+ val maxCores = record["maxCores"] as Int
+ val requiredMemory = record["requiredMemory"] as Long
+ val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray())
+
+ assert(uid !in takenIds)
+ takenIds += uid
+
+ logger.info { "Processing VM $id" }
+
+ val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
+ val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
+ buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
+ val fragments = sequence {
+ var time = submissionTime
+ repeat@ while (true) {
+ if (externalBuffer.isEmpty()) {
+ if (hasNext) {
+ pull(buffers)
+ continue
+ } else {
+ break
+ }
+ }
+
+ internalBuffer.addAll(externalBuffer)
+ externalBuffer.clear()
+
+ for (fragment in internalBuffer) {
+ yield(fragment)
+
+ time += fragment.duration
+ if (time >= endTime) {
+ break@repeat
+ }
+ }
+
+ internalBuffer.clear()
+ }
+
+ buffers.remove(id)
+ }
+ val workload = SimTraceWorkload(fragments)
+ val meta = mapOf(
+ "cores" to maxCores,
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
+
+ TraceEntry(uid, id, submissionTime, workload, meta)
+ }
+ .sortedBy { it.start }
+ .toList()
+ .iterator()
+ }
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
+
+ override fun close() {
+ readerThread.interrupt()
+ }
+
+ private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
+ override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
+
+ override fun canDrop(statistics: Statistics<Binary>): Boolean {
+ val min = statistics.min
+ val max = statistics.max
+
+ return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty()
+ }
+
+ override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean {
+ val min = statistics.min
+ val max = statistics.max
+
+ return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
+ }
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt
new file mode 100644
index 00000000..bae7cb22
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt
@@ -0,0 +1,260 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workload.vm.trace
+
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.arguments.argument
+import com.github.ajalt.clikt.parameters.groups.OptionGroup
+import com.github.ajalt.clikt.parameters.groups.cooccurring
+import com.github.ajalt.clikt.parameters.options.*
+import com.github.ajalt.clikt.parameters.types.*
+import mu.KotlinLogging
+import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.compute.workload.trace.azure.AzureTraceFormat
+import org.opendc.compute.workload.trace.bp.BP_RESOURCES_SCHEMA
+import org.opendc.compute.workload.trace.bp.BP_RESOURCE_STATES_SCHEMA
+import org.opendc.compute.workload.trace.sv.SvTraceFormat
+import org.opendc.trace.*
+import org.opendc.trace.bitbrains.BitbrainsTraceFormat
+import org.opendc.trace.util.parquet.LocalOutputFile
+import java.io.File
+import java.util.*
+import kotlin.math.max
+import kotlin.math.min
+import kotlin.math.roundToLong
+
+/**
+ * A script to convert a trace in text format into a Parquet trace.
+ */
+public fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
+
+/**
+ * Represents the command for converting traces
+ */
+internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
+ /**
+ * The logger instance for the converter.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The directory where the trace should be stored.
+ */
+ private val output by option("-O", "--output", help = "path to store the trace")
+ .file(canBeFile = false, mustExist = false)
+ .defaultLazy { File("output") }
+
+ /**
+ * The directory where the input trace is located.
+ */
+ private val input by argument("input", help = "path to the input trace")
+ .file(canBeFile = false)
+
+ /**
+ * The input format of the trace.
+ */
+ private val format by option("-f", "--format", help = "input format of trace")
+ .choice(
+ "solvinity" to SvTraceFormat(),
+ "bitbrains" to BitbrainsTraceFormat(),
+ "azure" to AzureTraceFormat()
+ )
+ .required()
+
+ /**
+ * The sampling options.
+ */
+ private val samplingOptions by SamplingOptions().cooccurring()
+
+ override fun run() {
+ val metaParquet = File(output, "meta.parquet")
+ val traceParquet = File(output, "trace.parquet")
+
+ if (metaParquet.exists()) {
+ metaParquet.delete()
+ }
+ if (traceParquet.exists()) {
+ traceParquet.delete()
+ }
+
+ val trace = format.open(input.toURI().toURL())
+
+ logger.info { "Building resources table" }
+
+ val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet))
+ .withSchema(BP_RESOURCES_SCHEMA)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .enablePageWriteChecksum()
+ .build()
+
+ val selectedVms = metaWriter.use { convertResources(trace, it) }
+
+ logger.info { "Wrote ${selectedVms.size} rows" }
+ logger.info { "Building resource states table" }
+
+ val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet))
+ .withSchema(BP_RESOURCE_STATES_SCHEMA)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .enableDictionaryEncoding()
+ .enablePageWriteChecksum()
+ .withBloomFilterEnabled("id", true)
+ .withBloomFilterNDV("id", selectedVms.size.toLong())
+ .build()
+
+ val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) }
+ logger.info { "Wrote $statesCount rows" }
+ }
+
+ /**
+ * Convert the resources table for the trace.
+ */
+ private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> {
+ val random = samplingOptions?.let { Random(it.seed) }
+ val samplingFraction = samplingOptions?.fraction ?: 1.0
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
+
+ var hasNextRow = reader.nextRow()
+ val selectedVms = mutableSetOf<String>()
+
+ while (hasNextRow) {
+ var id: String
+ var numCpus = Int.MIN_VALUE
+ var memCapacity = Double.MIN_VALUE
+ var memUsage = Double.MIN_VALUE
+ var startTime = Long.MAX_VALUE
+ var stopTime = Long.MIN_VALUE
+
+ do {
+ id = reader.get(RESOURCE_STATE_ID)
+
+ val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
+ startTime = min(startTime, timestamp)
+ stopTime = max(stopTime, timestamp)
+
+ numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS))
+
+ memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY))
+ if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) {
+ memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE))
+ }
+
+ hasNextRow = reader.nextRow()
+ } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
+
+ // Sample only a fraction of the VMs
+ if (random != null && random.nextDouble() > samplingFraction) {
+ continue
+ }
+
+ val builder = GenericRecordBuilder(BP_RESOURCES_SCHEMA)
+
+ builder["id"] = id
+ builder["submissionTime"] = startTime
+ builder["endTime"] = stopTime
+ builder["maxCores"] = numCpus
+ builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong()
+
+ logger.info { "Selecting VM $id" }
+
+ writer.write(builder.build())
+ selectedVms.add(id)
+ }
+
+ return selectedVms
+ }
+
+ /**
+ * Convert the resource states table for the trace.
+ */
+ private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
+
+ var hasNextRow = reader.nextRow()
+ var count = 0
+
+ while (hasNextRow) {
+ var lastTimestamp = Long.MIN_VALUE
+
+ do {
+ val id = reader.get(RESOURCE_STATE_ID)
+
+ if (id !in selectedVms) {
+ hasNextRow = reader.nextRow()
+ continue
+ }
+
+ val builder = GenericRecordBuilder(BP_RESOURCE_STATES_SCHEMA)
+ builder["id"] = id
+
+ val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
+ if (lastTimestamp < 0) {
+ lastTimestamp = timestamp - 5 * 60 * 1000L
+ }
+
+ val duration = timestamp - lastTimestamp
+ val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
+ val flops = (cpuUsage * duration / 1000.0).roundToLong()
+
+ builder["time"] = timestamp
+ builder["duration"] = duration
+ builder["cores"] = cores
+ builder["cpuUsage"] = cpuUsage
+ builder["flops"] = flops
+
+ writer.write(builder.build())
+
+ lastTimestamp = timestamp
+ hasNextRow = reader.nextRow()
+ } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
+
+ count++
+ }
+
+ return count
+ }
+
+ /**
+ * Options for sampling the workload trace.
+ */
+ private class SamplingOptions : OptionGroup() {
+ /**
+ * The fraction of VMs to sample
+ */
+ val fraction by option("--sampling-fraction", help = "fraction of the workload to sample")
+ .double()
+ .restrictTo(0.0001, 1.0)
+ .required()
+
+ /**
+ * The seed for sampling the trace.
+ */
+ val seed by option("--sampling-seed", help = "seed for sampling the workload")
+ .long()
+ .default(0)
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt
new file mode 100644
index 00000000..bfa2d051
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace
+
+import java.util.UUID
+
+/**
+ * An entry in a workload trace.
+ *
+ * @param uid The unique identifier of the entry.
+ * @param name The name of the entry.
+ * @param start The start time of the workload.
+ * @param workload The workload of the entry.
+ * @param meta The meta-data associated with the workload.
+ */
+public data class TraceEntry<out T>(
+ val uid: UUID,
+ val name: String,
+ val start: Long,
+ val workload: T,
+ val meta: Map<String, Any>
+)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt
new file mode 100644
index 00000000..b0795d61
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace
+
+/**
+ * An interface for reading workloads into memory.
+ *
+ * This interface must guarantee that the entries are delivered in order of submission time.
+ *
+ * @param T The shape of the workloads supported by this reader.
+ */
+public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTable.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTable.kt
new file mode 100644
index 00000000..32843595
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTable.kt
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.azure
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
+
+/**
+ * The resource state [Table] for the Azure v1 VM traces.
+ */
+internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table {
+ /**
+ * The partitions that belong to the table.
+ */
+ private val partitions = Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "csv" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
+
+ override val name: String = TABLE_RESOURCE_STATES
+
+ override val isSynthetic: Boolean = false
+
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_STATE_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_STATE_CPU_USAGE_PCT
+ )
+
+ override fun newReader(): TableReader {
+ val it = partitions.iterator()
+
+ return object : TableReader {
+ var delegate: TableReader? = nextDelegate()
+
+ override fun nextRow(): Boolean {
+ var delegate = delegate
+
+ while (delegate != null) {
+ if (delegate.nextRow()) {
+ break
+ }
+
+ delegate.close()
+ delegate = nextDelegate()
+ }
+
+ this.delegate = delegate
+ return delegate != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.get(column)
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getBoolean(column)
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getInt(column)
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getLong(column)
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getDouble(column)
+ }
+
+ override fun close() {
+ delegate?.close()
+ }
+
+ private fun nextDelegate(): TableReader? {
+ return if (it.hasNext()) {
+ val (_, path) = it.next()
+ return AzureResourceStateTableReader(factory.createParser(path.toFile()))
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "AzureCompositeTableReader"
+ }
+ }
+
+ override fun newReader(partition: String): TableReader {
+ val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
+ return AzureResourceStateTableReader(factory.createParser(path.toFile()))
+ }
+
+ override fun toString(): String = "AzureResourceStateTable"
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTableReader.kt
new file mode 100644
index 00000000..ded9d4d6
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceStateTableReader.kt
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.azure
+
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import org.opendc.trace.*
+import java.time.Instant
+
+/**
+ * A [TableReader] for the Azure v1 VM resource state table.
+ */
+internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader {
+ init {
+ parser.schema = schema
+ }
+
+ override fun nextRow(): Boolean {
+ reset()
+
+ if (!nextStart()) {
+ return false
+ }
+
+ while (true) {
+ val token = parser.nextValue()
+
+ if (token == null || token == JsonToken.END_OBJECT) {
+ break
+ }
+
+ when (parser.currentName) {
+ "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue)
+ "vm id" -> id = parser.text
+ "avg cpu" -> cpuUsagePct = parser.doubleValue
+ }
+ }
+
+ return true
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_CPU_USAGE_PCT -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ RESOURCE_STATE_ID -> id
+ RESOURCE_STATE_TIMESTAMP -> timestamp
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ return when (column) {
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Advance the parser until the next object start.
+ */
+ private fun nextStart(): Boolean {
+ var token = parser.nextValue()
+
+ while (token != null && token != JsonToken.START_OBJECT) {
+ token = parser.nextValue()
+ }
+
+ return token != null
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var id: String? = null
+ private var timestamp: Instant? = null
+ private var cpuUsagePct = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ id = null
+ timestamp = null
+ cpuUsagePct = Double.NaN
+ }
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("timestamp", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm id", CsvSchema.ColumnType.STRING)
+ .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .build()
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTable.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTable.kt
new file mode 100644
index 00000000..2bed7725
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTable.kt
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.azure
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * The resource [Table] for the Azure v1 VM traces.
+ */
+internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table {
+ override val name: String = TABLE_RESOURCES
+
+ override val isSynthetic: Boolean = false
+
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_ID,
+ RESOURCE_START_TIME,
+ RESOURCE_STOP_TIME,
+ RESOURCE_NCPUS,
+ RESOURCE_MEM_CAPACITY
+ )
+
+ override fun newReader(): TableReader {
+ return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile()))
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("No partition $partition")
+ }
+
+ override fun toString(): String = "AzureResourceTable"
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTableReader.kt
new file mode 100644
index 00000000..108ce4ed
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureResourceTableReader.kt
@@ -0,0 +1,168 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.azure
+
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import org.opendc.trace.*
+import java.time.Instant
+
+/**
+ * A [TableReader] for the Azure v1 VM resources table.
+ */
+internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader {
+ init {
+ parser.schema = schema
+ }
+
+ override fun nextRow(): Boolean {
+ reset()
+
+ if (!nextStart()) {
+ return false
+ }
+
+ while (true) {
+ val token = parser.nextValue()
+
+ if (token == null || token == JsonToken.END_OBJECT) {
+ break
+ }
+
+ when (parser.currentName) {
+ "vm id" -> id = parser.text
+ "vm created" -> startTime = Instant.ofEpochSecond(parser.longValue)
+ "vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue)
+ "vm virtual core count" -> cpuCores = parser.intValue
+ "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB
+ }
+ }
+
+ return true
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_ID -> true
+ RESOURCE_START_TIME -> true
+ RESOURCE_STOP_TIME -> true
+ RESOURCE_NCPUS -> true
+ RESOURCE_MEM_CAPACITY -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ RESOURCE_ID -> id
+ RESOURCE_START_TIME -> startTime
+ RESOURCE_STOP_TIME -> stopTime
+ RESOURCE_NCPUS -> getInt(RESOURCE_STATE_NCPUS)
+ RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ return when (column) {
+ RESOURCE_NCPUS -> cpuCores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ return when (column) {
+ RESOURCE_MEM_CAPACITY -> memCapacity
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Advance the parser until the next object start.
+ */
+ private fun nextStart(): Boolean {
+ var token = parser.nextValue()
+
+ while (token != null && token != JsonToken.START_OBJECT) {
+ token = parser.nextValue()
+ }
+
+ return token != null
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var id: String? = null
+ private var startTime: Instant? = null
+ private var stopTime: Instant? = null
+ private var cpuCores = -1
+ private var memCapacity = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ fun reset() {
+ id = null
+ startTime = null
+ stopTime = null
+ cpuCores = -1
+ memCapacity = Double.NaN
+ }
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("vm id", CsvSchema.ColumnType.NUMBER)
+ .addColumn("subscription id", CsvSchema.ColumnType.STRING)
+ .addColumn("deployment id", CsvSchema.ColumnType.NUMBER)
+ .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER)
+ .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER)
+ .addColumn("max cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm category", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm memory", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .build()
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTrace.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTrace.kt
new file mode 100644
index 00000000..93c2210b
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTrace.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.azure
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * [Trace] implementation for the Azure v1 VM traces.
+ */
+public class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
+
+ override fun containsTable(name: String): Boolean = name in tables
+
+ override fun getTable(name: String): Table? {
+ return when (name) {
+ TABLE_RESOURCES -> AzureResourceTable(factory, path)
+ TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path)
+ else -> null
+ }
+ }
+
+ override fun toString(): String = "AzureTrace[$path]"
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTraceFormat.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTraceFormat.kt
new file mode 100644
index 00000000..d400e1da
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/azure/AzureTraceFormat.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.azure
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A format implementation for the Azure v1 format.
+ */
+public class AzureTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "azure-v1"
+
+ /**
+ * The [CsvFactory] used to create the parser.
+ */
+ private val factory = CsvFactory()
+ .enable(CsvParser.Feature.ALLOW_COMMENTS)
+ .enable(CsvParser.Feature.TRIM_SPACES)
+
+ /**
+ * Open the trace file.
+ */
+ override fun open(url: URL): AzureTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return AzureTrace(factory, path)
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTable.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTable.kt
new file mode 100644
index 00000000..958ed49d
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTable.kt
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.nio.file.Path
+
+/**
+ * The resource state [Table] in the Bitbrains Parquet format.
+ */
+internal class BPResourceStateTable(private val path: Path) : Table {
+ override val name: String = TABLE_RESOURCE_STATES
+ override val isSynthetic: Boolean = false
+
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_STATE_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_STATE_DURATION,
+ RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_USAGE,
+ )
+
+ override fun newReader(): TableReader {
+ val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
+ return BPResourceStateTableReader(reader)
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Unknown partition $partition")
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTableReader.kt
new file mode 100644
index 00000000..655da2b6
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceStateTableReader.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A [TableReader] implementation for the Bitbrains Parquet format.
+ */
+internal class BPResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+ /**
+ * The current record.
+ */
+ private var record: GenericRecord? = null
+
+ override fun nextRow(): Boolean {
+ record = reader.read()
+ return record != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_DURATION -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ @Suppress("UNCHECKED_CAST")
+ val res: Any = when (column) {
+ RESOURCE_STATE_ID -> record["id"].toString()
+ RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long)
+ RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long)
+ RESOURCE_STATE_NCPUS -> record["cores"]
+ RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (column) {
+ RESOURCE_STATE_NCPUS -> record["cores"] as Int
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (column) {
+ RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ override fun toString(): String = "BPResourceStateTableReader"
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTable.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTable.kt
new file mode 100644
index 00000000..75782486
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTable.kt
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.nio.file.Path
+
+/**
+ * The resource [Table] in the Bitbrains Parquet format.
+ */
+internal class BPResourceTable(private val path: Path) : Table {
+ override val name: String = TABLE_RESOURCES
+ override val isSynthetic: Boolean = false
+
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_ID,
+ RESOURCE_START_TIME,
+ RESOURCE_STOP_TIME,
+ RESOURCE_NCPUS,
+ RESOURCE_MEM_CAPACITY
+ )
+
+ override fun newReader(): TableReader {
+ val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
+ return BPResourceTableReader(reader)
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Unknown partition $partition")
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTableReader.kt
new file mode 100644
index 00000000..323ee9d0
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPResourceTableReader.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Instant
+
+/**
+ * A [TableReader] implementation for the Bitbrains Parquet format.
+ */
+internal class BPResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+ /**
+ * The current record.
+ */
+ private var record: GenericRecord? = null
+
+ override fun nextRow(): Boolean {
+ record = reader.read()
+ return record != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_ID -> true
+ RESOURCE_START_TIME -> true
+ RESOURCE_STOP_TIME -> true
+ RESOURCE_NCPUS -> true
+ RESOURCE_MEM_CAPACITY -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ @Suppress("UNCHECKED_CAST")
+ val res: Any = when (column) {
+ RESOURCE_ID -> record["id"].toString()
+ RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long)
+ RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long)
+ RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS)
+ RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (column) {
+ RESOURCE_NCPUS -> record["maxCores"] as Int
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (column) {
+ RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ override fun toString(): String = "BPResourceTableReader"
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTrace.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTrace.kt
new file mode 100644
index 00000000..b4e64fab
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTrace.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.bp
+
+import org.opendc.trace.TABLE_RESOURCES
+import org.opendc.trace.TABLE_RESOURCE_STATES
+import org.opendc.trace.Table
+import org.opendc.trace.Trace
+import java.nio.file.Path
+
+/**
+ * A [Trace] in the Bitbrains Parquet format.
+ */
+public class BPTrace internal constructor(private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
+
+ override fun containsTable(name: String): Boolean =
+ name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES
+
+ override fun getTable(name: String): Table? {
+ return when (name) {
+ TABLE_RESOURCES -> BPResourceTable(path)
+ TABLE_RESOURCE_STATES -> BPResourceStateTable(path)
+ else -> null
+ }
+ }
+
+ override fun toString(): String = "BPTrace[$path]"
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTraceFormat.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTraceFormat.kt
new file mode 100644
index 00000000..9662476d
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/BPTraceFormat.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.bp
+
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A format implementation for the GWF trace format.
+ */
+public class BPTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "bitbrains-parquet"
+
+ /**
+ * Open a Bitbrains Parquet trace.
+ */
+ override fun open(url: URL): BPTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return BPTrace(path)
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/Schemas.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/Schemas.kt
new file mode 100644
index 00000000..4f6dbce3
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/bp/Schemas.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.bp
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+
+/**
+ * Schema for the resources table in the trace.
+ */
+public val BP_RESOURCES_SCHEMA: Schema = SchemaBuilder
+ .record("meta")
+ .namespace("org.opendc.trace.capelin")
+ .fields()
+ .requiredString("id")
+ .requiredLong("submissionTime")
+ .requiredLong("endTime")
+ .requiredInt("maxCores")
+ .requiredLong("requiredMemory")
+ .endRecord()
+
+/**
+ * Schema for the resource states table in the trace.
+ */
+public val BP_RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder
+ .record("meta")
+ .namespace("org.opendc.trace.capelin")
+ .fields()
+ .requiredString("id")
+ .requiredLong("time")
+ .requiredLong("duration")
+ .requiredInt("cores")
+ .requiredDouble("cpuUsage")
+ .requiredLong("flops")
+ .endRecord()
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTable.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTable.kt
new file mode 100644
index 00000000..3ff69d36
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTable.kt
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.sv
+
+import org.opendc.trace.*
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.bufferedReader
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
+
+/**
+ * The resource state [Table] in the extended Bitbrains format.
+ */
+internal class SvResourceStateTable(path: Path) : Table {
+ /**
+ * The partitions that belong to the table.
+ */
+ private val partitions = Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "txt" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
+
+ override val name: String = TABLE_RESOURCE_STATES
+
+ override val isSynthetic: Boolean = false
+
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_STATE_ID,
+ RESOURCE_STATE_CLUSTER_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_CAPACITY,
+ RESOURCE_STATE_CPU_USAGE,
+ RESOURCE_STATE_CPU_USAGE_PCT,
+ RESOURCE_STATE_CPU_DEMAND,
+ RESOURCE_STATE_CPU_READY_PCT,
+ RESOURCE_STATE_MEM_CAPACITY,
+ RESOURCE_STATE_DISK_READ,
+ RESOURCE_STATE_DISK_WRITE,
+ )
+
+ override fun newReader(): TableReader {
+ val it = partitions.iterator()
+
+ return object : TableReader {
+ var delegate: TableReader? = nextDelegate()
+
+ override fun nextRow(): Boolean {
+ var delegate = delegate
+
+ while (delegate != null) {
+ if (delegate.nextRow()) {
+ break
+ }
+
+ delegate.close()
+ delegate = nextDelegate()
+ }
+
+ this.delegate = delegate
+ return delegate != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.get(column)
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getBoolean(column)
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getInt(column)
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getLong(column)
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getDouble(column)
+ }
+
+ override fun close() {
+ delegate?.close()
+ }
+
+ private fun nextDelegate(): TableReader? {
+ return if (it.hasNext()) {
+ val (_, path) = it.next()
+ val reader = path.bufferedReader()
+ return SvResourceStateTableReader(reader)
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "SvCompositeTableReader"
+ }
+ }
+
+ override fun newReader(partition: String): TableReader {
+ val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
+ val reader = path.bufferedReader()
+ return SvResourceStateTableReader(reader)
+ }
+
+ override fun toString(): String = "SvResourceStateTable"
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTableReader.kt
new file mode 100644
index 00000000..487be950
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvResourceStateTableReader.kt
@@ -0,0 +1,212 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.sv
+
+import org.opendc.trace.*
+import java.io.BufferedReader
+import java.time.Instant
+
+/**
+ * A [TableReader] for the Bitbrains resource state table.
+ */
+internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader {
+ override fun nextRow(): Boolean {
+ reset()
+
+ var line: String
+ var num = 0
+
+ while (true) {
+ line = reader.readLine() ?: return false
+ num++
+
+ if (line[0] == '#' || line.isBlank()) {
+ // Ignore empty lines or comments
+ continue
+ }
+
+ break
+ }
+
+ line = line.trim()
+
+ val length = line.length
+ var col = 0
+ var start: Int
+ var end = 0
+
+ while (end < length) {
+ // Trim all whitespace before the field
+ start = end
+ while (start < length && line[start].isWhitespace()) {
+ start++
+ }
+
+ end = line.indexOf(' ', start)
+
+ if (end < 0) {
+ end = length
+ }
+
+ val field = line.subSequence(start, end) as String
+ when (col++) {
+ COL_TIMESTAMP -> timestamp = Instant.ofEpochSecond(field.toLong(10))
+ COL_CPU_USAGE -> cpuUsage = field.toDouble()
+ COL_CPU_DEMAND -> cpuDemand = field.toDouble()
+ COL_DISK_READ -> diskRead = field.toDouble()
+ COL_DISK_WRITE -> diskWrite = field.toDouble()
+ COL_CLUSTER_ID -> cluster = field.trim()
+ COL_NCPUS -> cpuCores = field.toInt(10)
+ COL_CPU_READY_PCT -> cpuReadyPct = field.toDouble()
+ COL_POWERED_ON -> poweredOn = field.toInt(10) == 1
+ COL_CPU_CAPACITY -> cpuCapacity = field.toDouble()
+ COL_ID -> id = field.trim()
+ COL_MEM_CAPACITY -> memCapacity = field.toDouble()
+ }
+ }
+
+ return true
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_CLUSTER_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_CAPACITY -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ RESOURCE_STATE_CPU_USAGE_PCT -> true
+ RESOURCE_STATE_CPU_DEMAND -> true
+ RESOURCE_STATE_CPU_READY_PCT -> true
+ RESOURCE_STATE_MEM_CAPACITY -> true
+ RESOURCE_STATE_DISK_READ -> true
+ RESOURCE_STATE_DISK_WRITE -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ RESOURCE_STATE_ID -> id
+ RESOURCE_STATE_CLUSTER_ID -> cluster
+ RESOURCE_STATE_TIMESTAMP -> timestamp
+ RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS)
+ RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY)
+ RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
+ RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT)
+ RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY)
+ RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ)
+ RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_POWERED_ON -> poweredOn
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ return when (column) {
+ RESOURCE_STATE_NCPUS -> cpuCores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ return when (column) {
+ RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity
+ RESOURCE_STATE_CPU_DEMAND -> cpuDemand
+ RESOURCE_STATE_MEM_CAPACITY -> memCapacity
+ RESOURCE_STATE_DISK_READ -> diskRead
+ RESOURCE_STATE_DISK_WRITE -> diskWrite
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var id: String? = null
+ private var cluster: String? = null
+ private var timestamp: Instant? = null
+ private var cpuCores = -1
+ private var cpuCapacity = Double.NaN
+ private var cpuUsage = Double.NaN
+ private var cpuDemand = Double.NaN
+ private var cpuReadyPct = Double.NaN
+ private var memCapacity = Double.NaN
+ private var diskRead = Double.NaN
+ private var diskWrite = Double.NaN
+ private var poweredOn: Boolean = false
+
+ /**
+ * Reset the state of the reader.
+ */
+ private fun reset() {
+ id = null
+ timestamp = null
+ cluster = null
+ cpuCores = -1
+ cpuCapacity = Double.NaN
+ cpuUsage = Double.NaN
+ cpuDemand = Double.NaN
+ cpuReadyPct = Double.NaN
+ memCapacity = Double.NaN
+ diskRead = Double.NaN
+ diskWrite = Double.NaN
+ poweredOn = false
+ }
+
+ /**
+ * Default column indices for the extended Bitbrains format.
+ */
+ private val COL_TIMESTAMP = 0
+ private val COL_CPU_USAGE = 1
+ private val COL_CPU_DEMAND = 2
+ private val COL_DISK_READ = 4
+ private val COL_DISK_WRITE = 6
+ private val COL_CLUSTER_ID = 10
+ private val COL_NCPUS = 12
+ private val COL_CPU_READY_PCT = 13
+ private val COL_POWERED_ON = 14
+ private val COL_CPU_CAPACITY = 18
+ private val COL_ID = 19
+ private val COL_MEM_CAPACITY = 20
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTrace.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTrace.kt
new file mode 100644
index 00000000..932c73ab
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTrace.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.sv
+
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * [Trace] implementation for the extended Bitbrains format.
+ */
+public class SvTrace internal constructor(private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_RESOURCE_STATES)
+
+ override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name
+
+ override fun getTable(name: String): Table? {
+ if (!containsTable(name)) {
+ return null
+ }
+
+ return SvResourceStateTable(path)
+ }
+
+ override fun toString(): String = "SvTrace[$path]"
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTraceFormat.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTraceFormat.kt
new file mode 100644
index 00000000..ba673b5f
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/sv/SvTraceFormat.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.trace.sv
+
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A format implementation for the extended Bitbrains trace format.
+ */
+public class SvTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "sv"
+
+ /**
+ * Open the trace file.
+ */
+ override fun open(url: URL): SvTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return SvTrace(path)
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt
new file mode 100644
index 00000000..67f9626c
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.util
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import com.fasterxml.jackson.module.kotlin.readValue
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup
+import java.io.File
+import java.io.InputStream
+
+/**
+ * A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
+ */
+public class PerformanceInterferenceReader {
+ /**
+ * The [ObjectMapper] to use.
+ */
+ private val mapper = jacksonObjectMapper()
+
+ init {
+ mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java)
+ }
+
+ /**
+ * Read the performance interface model from [file].
+ */
+ public fun read(file: File): List<VmInterferenceGroup> {
+ return mapper.readValue(file)
+ }
+
+ /**
+ * Read the performance interface model from the input.
+ */
+ public fun read(input: InputStream): List<VmInterferenceGroup> {
+ return mapper.readValue(input)
+ }
+
+ private data class GroupMixin(
+ @JsonProperty("minServerLoad")
+ val targetLoad: Double,
+ @JsonProperty("performanceScore")
+ val score: Double,
+ @JsonProperty("vms")
+ val members: Set<String>,
+ )
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt
new file mode 100644
index 00000000..c79f0584
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.util
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+
+/**
+ * Test suite for the [PerformanceInterferenceReader] class.
+ */
+class PerformanceInterferenceReaderTest {
+ @Test
+ fun testSmoke() {
+ val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json"))
+ val result = PerformanceInterferenceReader().read(input)
+
+ assertAll(
+ { assertEquals(2, result.size) },
+ { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) },
+ { assertEquals(0.0, result[0].targetLoad, 0.001) },
+ { assertEquals(0.8830158730158756, result[0].score, 0.001) }
+ )
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json
new file mode 100644
index 00000000..1be5852b
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json
@@ -0,0 +1,22 @@
+[
+ {
+ "vms": [
+ "vm_a",
+ "vm_c",
+ "vm_x",
+ "vm_y"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.8830158730158756
+ },
+ {
+ "vms": [
+ "vm_a",
+ "vm_b",
+ "vm_c",
+ "vm_d"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.7133055555552751
+ }
+]