summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts11
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt21
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt38
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt44
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt145
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt67
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt101
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt95
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt65
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt60
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt139
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt261
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt260
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt44
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt32
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt127
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt149
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt54
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt169
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt46
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt56
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt53
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt103
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt53
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt103
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt49
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt47
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt55
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt138
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt212
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt45
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt47
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt222
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt38
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt97
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt)2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt17
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt45
43 files changed, 41 insertions, 3286 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 7dadd14d..010d18b0 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -31,6 +31,8 @@ plugins {
dependencies {
api(platform(projects.opendcPlatform))
api(projects.opendcHarness.opendcHarnessApi)
+ api(projects.opendcCompute.opendcComputeWorkload)
+
implementation(projects.opendcTrace.opendcTraceParquet)
implementation(projects.opendcTrace.opendcTraceBitbrains)
implementation(projects.opendcSimulator.opendcSimulatorCore)
@@ -38,16 +40,13 @@ dependencies {
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
implementation(projects.opendcTelemetry.opendcTelemetryCompute)
- implementation(libs.opentelemetry.semconv)
- implementation(libs.kotlin.logging)
implementation(libs.config)
- implementation(libs.progressbar)
- implementation(libs.clikt)
+ implementation(libs.kotlin.logging)
+ implementation(libs.jackson.databind)
implementation(libs.jackson.module.kotlin)
- implementation(libs.jackson.dataformat.csv)
implementation(kotlin("reflect"))
+ implementation(libs.opentelemetry.semconv)
- implementation(libs.parquet)
testImplementation(libs.log4j.slf4j)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 6261ebbf..06db5569 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -24,16 +24,17 @@ package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkloadRunner
+import org.opendc.compute.workload.export.parquet.ParquetExportMonitor
+import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.trace.RawParquetTraceReader
+import org.opendc.compute.workload.util.PerformanceInterferenceReader
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
-import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.ParquetTraceReader
-import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.trace.RawParquetTraceReader
-import org.opendc.experiments.capelin.util.ComputeServiceSimulator
import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
@@ -43,7 +44,6 @@ import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
-import java.io.FileInputStream
import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentHashMap
@@ -100,7 +100,12 @@ abstract class Portfolio(name: String) : Experiment(name) {
*/
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
- val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
+ val environment = ClusterEnvironmentReader(
+ File(
+ config.getString("env-path"),
+ "${topology.name}.txt"
+ )
+ )
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
@@ -117,7 +122,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
PerformanceInterferenceReader()
- .read(FileInputStream(config.getString("interference-model")))
+ .read(File(config.getString("interference-model")))
.let { VmInterferenceModel(it, Random(seeder.nextLong())) }
else
null
@@ -128,7 +133,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt())
else
null
- val simulator = ComputeServiceSimulator(
+ val simulator = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler,
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
index babd8ada..8d9b24f4 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.capelin.env
+import org.opendc.compute.workload.env.MachineDef
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt
index a968b043..8d61c530 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.capelin.env
+import org.opendc.compute.workload.env.MachineDef
import java.io.Closeable
/**
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt
deleted file mode 100644
index b0c0318f..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.env
-
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.power.PowerModel
-import java.util.*
-
-/**
- * A definition of a machine in a cluster.
- */
-public data class MachineDef(
- val uid: UUID,
- val name: String,
- val meta: Map<String, Any>,
- val model: MachineModel,
- val powerModel: PowerModel
-)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt
deleted file mode 100644
index a4676f31..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("AvroUtils")
-package org.opendc.experiments.capelin.export.parquet
-
-import org.apache.avro.LogicalTypes
-import org.apache.avro.Schema
-
-/**
- * Schema for UUID type.
- */
-internal val UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))
-
-/**
- * Schema for timestamp type.
- */
-internal val TIMESTAMP_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))
-
-/**
- * Helper function to make a [Schema] field optional.
- */
-internal fun Schema.optional(): Schema {
- return Schema.createUnion(Schema.create(Schema.Type.NULL), this)
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
deleted file mode 100644
index e3d15c3b..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.export.parquet
-
-import mu.KotlinLogging
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetFileWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.trace.util.parquet.LocalOutputFile
-import java.io.File
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.BlockingQueue
-import kotlin.concurrent.thread
-
-/**
- * A writer that writes data in Parquet format.
- */
-abstract class ParquetDataWriter<in T>(
- path: File,
- private val schema: Schema,
- bufferSize: Int = 4096
-) : AutoCloseable {
- /**
- * The logging instance to use.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * The queue of commands to process.
- */
- private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
-
- /**
- * An exception to be propagated to the actual writer.
- */
- private var exception: Throwable? = null
-
- /**
- * The thread that is responsible for writing the Parquet records.
- */
- private val writerThread = thread(start = false, name = this.toString()) {
- val writer = let {
- val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- buildWriter(builder)
- }
-
- val queue = queue
- val buf = mutableListOf<T>()
- var shouldStop = false
-
- try {
- while (!shouldStop) {
- try {
- process(writer, queue.take())
- } catch (e: InterruptedException) {
- shouldStop = true
- }
-
- if (queue.drainTo(buf) > 0) {
- for (data in buf) {
- process(writer, data)
- }
- buf.clear()
- }
- }
- } catch (e: Throwable) {
- logger.error(e) { "Failure in Parquet data writer" }
- exception = e
- } finally {
- writer.close()
- }
- }
-
- /**
- * Build the [ParquetWriter] used to write the Parquet files.
- */
- protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
- return builder.build()
- }
-
- /**
- * Convert the specified [data] into a Parquet record.
- */
- protected abstract fun convert(builder: GenericRecordBuilder, data: T)
-
- /**
- * Write the specified metrics to the database.
- */
- fun write(data: T) {
- val exception = exception
- if (exception != null) {
- throw IllegalStateException("Writer thread failed", exception)
- }
-
- queue.put(data)
- }
-
- /**
- * Signal the writer to stop.
- */
- override fun close() {
- writerThread.interrupt()
- writerThread.join()
- }
-
- init {
- writerThread.start()
- }
-
- /**
- * Process the specified [data] to be written to the Parquet file.
- */
- private fun process(writer: ParquetWriter<GenericData.Record>, data: T) {
- val builder = GenericRecordBuilder(schema)
- convert(builder, data)
- writer.write(builder.build())
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
deleted file mode 100644
index b057e932..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.export.parquet
-
-import org.opendc.telemetry.compute.ComputeMonitor
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.ServerData
-import org.opendc.telemetry.compute.table.ServiceData
-import java.io.File
-
-/**
- * A [ComputeMonitor] that logs the events to a Parquet file.
- */
-class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
- private val serverWriter = ParquetServerDataWriter(
- File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
-
- private val hostWriter = ParquetHostDataWriter(
- File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
-
- private val serviceWriter = ParquetServiceDataWriter(
- File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
-
- override fun record(data: ServerData) {
- serverWriter.write(data)
- }
-
- override fun record(data: HostData) {
- hostWriter.write(data)
- }
-
- override fun record(data: ServiceData) {
- serviceWriter.write(data)
- }
-
- override fun close() {
- hostWriter.close()
- serviceWriter.close()
- serverWriter.close()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
deleted file mode 100644
index 58388cb1..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.export.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.telemetry.compute.table.HostData
-import java.io.File
-
-/**
- * A Parquet event writer for [HostData]s.
- */
-public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) {
-
- override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
- return builder
- .withDictionaryEncoding("host_id", true)
- .build()
- }
-
- override fun convert(builder: GenericRecordBuilder, data: HostData) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
-
- builder["host_id"] = data.host.id
-
- builder["uptime"] = data.uptime
- builder["downtime"] = data.downtime
- val bootTime = data.bootTime
- if (bootTime != null) {
- builder["boot_time"] = bootTime.toEpochMilli()
- }
-
- builder["cpu_count"] = data.host.cpuCount
- builder["cpu_limit"] = data.cpuLimit
- builder["cpu_time_active"] = data.cpuActiveTime
- builder["cpu_time_idle"] = data.cpuIdleTime
- builder["cpu_time_steal"] = data.cpuStealTime
- builder["cpu_time_lost"] = data.cpuLostTime
-
- builder["mem_limit"] = data.host.memCapacity
-
- builder["power_total"] = data.powerTotal
-
- builder["guests_terminated"] = data.guestsTerminated
- builder["guests_running"] = data.guestsRunning
- builder["guests_error"] = data.guestsError
- builder["guests_invalid"] = data.guestsInvalid
- }
-
- override fun toString(): String = "host-writer"
-
- companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("host")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .name("host_id").type(UUID_SCHEMA).noDefault()
- .requiredLong("uptime")
- .requiredLong("downtime")
- .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .requiredInt("cpu_count")
- .requiredDouble("cpu_limit")
- .requiredLong("cpu_time_active")
- .requiredLong("cpu_time_idle")
- .requiredLong("cpu_time_steal")
- .requiredLong("cpu_time_lost")
- .requiredLong("mem_limit")
- .requiredDouble("power_total")
- .requiredInt("guests_terminated")
- .requiredInt("guests_running")
- .requiredInt("guests_error")
- .requiredInt("guests_invalid")
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
deleted file mode 100644
index 43b5f469..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.export.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.telemetry.compute.table.ServerData
-import java.io.File
-import java.util.*
-
-/**
- * A Parquet event writer for [ServerData]s.
- */
-public class ParquetServerDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) {
-
- override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
- return builder
- .withDictionaryEncoding("server_id", true)
- .withDictionaryEncoding("host_id", true)
- .build()
- }
-
- override fun convert(builder: GenericRecordBuilder, data: ServerData) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
-
- builder["server_id"] = data.server.id
- builder["host_id"] = data.host?.id
-
- builder["uptime"] = data.uptime
- builder["downtime"] = data.downtime
- val bootTime = data.bootTime
- if (bootTime != null) {
- builder["boot_time"] = bootTime.toEpochMilli()
- }
- builder["scheduling_latency"] = data.schedulingLatency
-
- builder["cpu_count"] = data.server.cpuCount
- builder["cpu_limit"] = data.cpuLimit
- builder["cpu_time_active"] = data.cpuActiveTime
- builder["cpu_time_idle"] = data.cpuIdleTime
- builder["cpu_time_steal"] = data.cpuStealTime
- builder["cpu_time_lost"] = data.cpuLostTime
-
- builder["mem_limit"] = data.server.memCapacity
- }
-
- override fun toString(): String = "server-writer"
-
- companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("server")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .name("server_id").type(UUID_SCHEMA).noDefault()
- .name("host_id").type(UUID_SCHEMA.optional()).noDefault()
- .requiredLong("uptime")
- .requiredLong("downtime")
- .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .requiredLong("scheduling_latency")
- .requiredInt("cpu_count")
- .requiredDouble("cpu_limit")
- .requiredLong("cpu_time_active")
- .requiredLong("cpu_time_idle")
- .requiredLong("cpu_time_steal")
- .requiredLong("cpu_time_lost")
- .requiredLong("mem_limit")
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
deleted file mode 100644
index 2928f445..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.export.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericRecordBuilder
-import org.opendc.telemetry.compute.table.ServiceData
-import java.io.File
-
-/**
- * A Parquet event writer for [ServiceData]s.
- */
-public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) {
-
- override fun convert(builder: GenericRecordBuilder, data: ServiceData) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
- builder["hosts_up"] = data.hostsUp
- builder["hosts_down"] = data.hostsDown
- builder["servers_pending"] = data.serversPending
- builder["servers_active"] = data.serversActive
- builder["attempts_success"] = data.attemptsSuccess
- builder["attempts_failure"] = data.attemptsFailure
- builder["attempts_error"] = data.attemptsError
- }
-
- override fun toString(): String = "service-writer"
-
- companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("service")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .requiredInt("hosts_up")
- .requiredInt("hosts_down")
- .requiredInt("servers_pending")
- .requiredInt("servers_active")
- .requiredInt("attempts_success")
- .requiredInt("attempts_failure")
- .requiredInt("attempts_error")
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
index 0bf4ada6..498636ba 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,6 +22,9 @@
package org.opendc.experiments.capelin.trace
+import org.opendc.compute.workload.trace.RawParquetTraceReader
+import org.opendc.compute.workload.trace.TraceEntry
+import org.opendc.compute.workload.trace.TraceReader
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.Workload
import org.opendc.simulator.compute.workload.SimWorkload
@@ -51,7 +54,10 @@ public class ParquetTraceReader(
this.zip(listOf(workload))
}
}
- .flatMap { sampleWorkload(it.first, workload, it.second, seed).sortedBy(TraceEntry<SimWorkload>::start) }
+ .flatMap {
+ sampleWorkload(it.first, workload, it.second, seed)
+ .sortedBy(TraceEntry<SimWorkload>::start)
+ }
.iterator()
override fun hasNext(): Boolean = iterator.hasNext()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
deleted file mode 100644
index 9549af42..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import com.fasterxml.jackson.annotation.JsonProperty
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import com.fasterxml.jackson.module.kotlin.readValue
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup
-import java.io.InputStream
-
-/**
- * A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
- */
-class PerformanceInterferenceReader {
- /**
- * The [ObjectMapper] to use.
- */
- private val mapper = jacksonObjectMapper()
-
- init {
- mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java)
- }
-
- /**
- * Read the performance interface model from the input.
- */
- fun read(input: InputStream): List<VmInterferenceGroup> {
- return input.use { mapper.readValue(input) }
- }
-
- private data class GroupMixin(
- @JsonProperty("minServerLoad")
- val targetLoad: Double,
- @JsonProperty("performanceScore")
- val score: Double,
- @JsonProperty("vms")
- val members: Set<String>,
- )
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
deleted file mode 100644
index ca937328..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import org.opendc.experiments.capelin.trace.bp.BPTraceFormat
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.trace.*
-import java.io.File
-import java.util.UUID
-
-/**
- * A [TraceReader] for the internal VM workload trace format.
- *
- * @param path The directory of the traces.
- */
-class RawParquetTraceReader(private val path: File) {
- /**
- * The [Trace] that represents this trace.
- */
- private val trace = BPTraceFormat().open(path.toURI().toURL())
-
- /**
- * Read the fragments into memory.
- */
- private fun parseFragments(): Map<String, List<SimTraceWorkload.Fragment>> {
- val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
-
- val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
-
- return try {
- while (reader.nextRow()) {
- val id = reader.get(RESOURCE_STATE_ID)
- val time = reader.get(RESOURCE_STATE_TIMESTAMP)
- val duration = reader.get(RESOURCE_STATE_DURATION)
- val cores = reader.getInt(RESOURCE_STATE_NCPUS)
- val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
-
- val fragment = SimTraceWorkload.Fragment(
- time.toEpochMilli(),
- duration.toMillis(),
- cpuUsage,
- cores
- )
-
- fragments.getOrPut(id) { mutableListOf() }.add(fragment)
- }
-
- fragments
- } finally {
- reader.close()
- }
- }
-
- /**
- * Read the metadata into a workload.
- */
- private fun parseMeta(fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
- val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
-
- var counter = 0
- val entries = mutableListOf<TraceEntry<SimWorkload>>()
-
- return try {
- while (reader.nextRow()) {
-
- val id = reader.get(RESOURCE_ID)
- if (!fragments.containsKey(id)) {
- continue
- }
-
- val submissionTime = reader.get(RESOURCE_START_TIME)
- val endTime = reader.get(RESOURCE_STOP_TIME)
- val maxCores = reader.getInt(RESOURCE_NCPUS)
- val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB
- val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
-
- val vmFragments = fragments.getValue(id).asSequence()
- val totalLoad = vmFragments.sumOf { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
- val workload = SimTraceWorkload(vmFragments)
- entries.add(
- TraceEntry(
- uid, id, submissionTime.toEpochMilli(), workload,
- mapOf(
- "submit-time" to submissionTime.toEpochMilli(),
- "end-time" to endTime.toEpochMilli(),
- "total-load" to totalLoad,
- "cores" to maxCores,
- "required-memory" to requiredMemory.toLong(),
- "workload" to workload
- )
- )
- )
- }
-
- entries
- } catch (e: Exception) {
- e.printStackTrace()
- throw e
- } finally {
- reader.close()
- }
- }
-
- /**
- * The entries in the trace.
- */
- private val entries: List<TraceEntry<SimWorkload>>
-
- init {
- val fragments = parseFragments()
- entries = parseMeta(fragments)
- }
-
- /**
- * Read the entries in the trace.
- */
- fun read(): List<TraceEntry<SimWorkload>> = entries
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
deleted file mode 100644
index ed82217d..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.filter2.compat.FilterCompat
-import org.apache.parquet.filter2.predicate.FilterApi
-import org.apache.parquet.filter2.predicate.Statistics
-import org.apache.parquet.filter2.predicate.UserDefinedPredicate
-import org.apache.parquet.io.api.Binary
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.trace.util.parquet.LocalInputFile
-import java.io.File
-import java.io.Serializable
-import java.util.SortedSet
-import java.util.TreeSet
-import java.util.UUID
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
-
-/**
- * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
- *
- * @param traceFile The directory of the traces.
- * @param selectedVms The list of VMs to read from the trace.
- */
-class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> {
- private val logger = KotlinLogging.logger {}
-
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>>
-
- /**
- * The intermediate buffer to store the read records in.
- */
- private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024)
-
- /**
- * An optional filter for filtering the selected VMs
- */
- private val filter =
- if (selectedVms.isEmpty())
- null
- else
- FilterCompat.get(
- FilterApi.userDefined(
- FilterApi.binaryColumn("id"),
- SelectedVmFilter(
- TreeSet(selectedVms)
- )
- )
- )
-
- /**
- * A poisonous fragment.
- */
- private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0))
-
- /**
- * The thread to read the records in.
- */
- private val readerThread = thread(start = true, name = "sc20-reader") {
- val reader = AvroParquetReader
- .builder<GenericData.Record>(LocalInputFile(File(traceFile, "trace.parquet")))
- .disableCompatibility()
- .withFilter(filter)
- .build()
-
- try {
- while (true) {
- val record = reader.read()
-
- if (record == null) {
- queue.put(poison)
- break
- }
-
- val id = record["id"].toString()
- val time = record["time"] as Long
- val duration = record["duration"] as Long
- val cores = record["cores"] as Int
- val cpuUsage = record["cpuUsage"] as Double
-
- val fragment = SimTraceWorkload.Fragment(
- time,
- duration,
- cpuUsage,
- cores
- )
-
- queue.put(id to fragment)
- }
- } catch (e: InterruptedException) {
- // Do not rethrow this
- } finally {
- reader.close()
- }
- }
-
- /**
- * Fill the buffers with the VMs
- */
- private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) {
- if (!hasNext) {
- return
- }
-
- val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>()
- queue.drainTo(fragments)
-
- for ((id, fragment) in fragments) {
- if (id == poison.first) {
- hasNext = false
- return
- }
- buffers[id]?.forEach { it.add(fragment) }
- }
- }
-
- /**
- * A flag to indicate whether the reader has more entries.
- */
- private var hasNext: Boolean = true
-
- /**
- * Initialize the reader.
- */
- init {
- val takenIds = mutableSetOf<UUID>()
- val entries = mutableMapOf<String, GenericData.Record>()
- val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
-
- val metaReader = AvroParquetReader
- .builder<GenericData.Record>(LocalInputFile(File(traceFile, "meta.parquet")))
- .disableCompatibility()
- .withFilter(filter)
- .build()
-
- while (true) {
- val record = metaReader.read() ?: break
- val id = record["id"].toString()
- entries[id] = record
- }
-
- metaReader.close()
-
- val selection = selectedVms.ifEmpty { entries.keys }
-
- // Create the entry iterator
- iterator = selection.asSequence()
- .mapNotNull { entries[it] }
- .mapIndexed { index, record ->
- val id = record["id"].toString()
- val submissionTime = record["submissionTime"] as Long
- val endTime = record["endTime"] as Long
- val maxCores = record["maxCores"] as Int
- val requiredMemory = record["requiredMemory"] as Long
- val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray())
-
- assert(uid !in takenIds)
- takenIds += uid
-
- logger.info { "Processing VM $id" }
-
- val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
- val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
- buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
- val fragments = sequence {
- var time = submissionTime
- repeat@ while (true) {
- if (externalBuffer.isEmpty()) {
- if (hasNext) {
- pull(buffers)
- continue
- } else {
- break
- }
- }
-
- internalBuffer.addAll(externalBuffer)
- externalBuffer.clear()
-
- for (fragment in internalBuffer) {
- yield(fragment)
-
- time += fragment.duration
- if (time >= endTime) {
- break@repeat
- }
- }
-
- internalBuffer.clear()
- }
-
- buffers.remove(id)
- }
- val workload = SimTraceWorkload(fragments)
- val meta = mapOf(
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
-
- TraceEntry(uid, id, submissionTime, workload, meta)
- }
- .sortedBy { it.start }
- .toList()
- .iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {
- readerThread.interrupt()
- }
-
- private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
- override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
-
- override fun canDrop(statistics: Statistics<Binary>): Boolean {
- val min = statistics.min
- val max = statistics.max
-
- return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty()
- }
-
- override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean {
- val min = statistics.min
- val max = statistics.max
-
- return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
- }
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
deleted file mode 100644
index 1f3878eb..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import com.github.ajalt.clikt.core.CliktCommand
-import com.github.ajalt.clikt.parameters.arguments.argument
-import com.github.ajalt.clikt.parameters.groups.OptionGroup
-import com.github.ajalt.clikt.parameters.groups.cooccurring
-import com.github.ajalt.clikt.parameters.options.*
-import com.github.ajalt.clikt.parameters.types.*
-import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.experiments.capelin.trace.azure.AzureTraceFormat
-import org.opendc.experiments.capelin.trace.bp.BP_RESOURCES_SCHEMA
-import org.opendc.experiments.capelin.trace.bp.BP_RESOURCE_STATES_SCHEMA
-import org.opendc.experiments.capelin.trace.sv.SvTraceFormat
-import org.opendc.trace.*
-import org.opendc.trace.bitbrains.BitbrainsTraceFormat
-import org.opendc.trace.util.parquet.LocalOutputFile
-import java.io.File
-import java.util.*
-import kotlin.math.max
-import kotlin.math.min
-import kotlin.math.roundToLong
-
-/**
- * A script to convert a trace in text format into a Parquet trace.
- */
-fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
-
-/**
- * Represents the command for converting traces
- */
-class TraceConverterCli : CliktCommand(name = "trace-converter") {
- /**
- * The logger instance for the converter.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * The directory where the trace should be stored.
- */
- private val output by option("-O", "--output", help = "path to store the trace")
- .file(canBeFile = false, mustExist = false)
- .defaultLazy { File("output") }
-
- /**
- * The directory where the input trace is located.
- */
- private val input by argument("input", help = "path to the input trace")
- .file(canBeFile = false)
-
- /**
- * The input format of the trace.
- */
- private val format by option("-f", "--format", help = "input format of trace")
- .choice(
- "solvinity" to SvTraceFormat(),
- "bitbrains" to BitbrainsTraceFormat(),
- "azure" to AzureTraceFormat()
- )
- .required()
-
- /**
- * The sampling options.
- */
- private val samplingOptions by SamplingOptions().cooccurring()
-
- override fun run() {
- val metaParquet = File(output, "meta.parquet")
- val traceParquet = File(output, "trace.parquet")
-
- if (metaParquet.exists()) {
- metaParquet.delete()
- }
- if (traceParquet.exists()) {
- traceParquet.delete()
- }
-
- val trace = format.open(input.toURI().toURL())
-
- logger.info { "Building resources table" }
-
- val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet))
- .withSchema(BP_RESOURCES_SCHEMA)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .enablePageWriteChecksum()
- .build()
-
- val selectedVms = metaWriter.use { convertResources(trace, it) }
-
- logger.info { "Wrote ${selectedVms.size} rows" }
- logger.info { "Building resource states table" }
-
- val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet))
- .withSchema(BP_RESOURCE_STATES_SCHEMA)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .enableDictionaryEncoding()
- .enablePageWriteChecksum()
- .withBloomFilterEnabled("id", true)
- .withBloomFilterNDV("id", selectedVms.size.toLong())
- .build()
-
- val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) }
- logger.info { "Wrote $statesCount rows" }
- }
-
- /**
- * Convert the resources table for the trace.
- */
- private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> {
- val random = samplingOptions?.let { Random(it.seed) }
- val samplingFraction = samplingOptions?.fraction ?: 1.0
- val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
-
- var hasNextRow = reader.nextRow()
- val selectedVms = mutableSetOf<String>()
-
- while (hasNextRow) {
- var id: String
- var numCpus = Int.MIN_VALUE
- var memCapacity = Double.MIN_VALUE
- var memUsage = Double.MIN_VALUE
- var startTime = Long.MAX_VALUE
- var stopTime = Long.MIN_VALUE
-
- do {
- id = reader.get(RESOURCE_STATE_ID)
-
- val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
- startTime = min(startTime, timestamp)
- stopTime = max(stopTime, timestamp)
-
- numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS))
-
- memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY))
- if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) {
- memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE))
- }
-
- hasNextRow = reader.nextRow()
- } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
-
- // Sample only a fraction of the VMs
- if (random != null && random.nextDouble() > samplingFraction) {
- continue
- }
-
- val builder = GenericRecordBuilder(BP_RESOURCES_SCHEMA)
-
- builder["id"] = id
- builder["submissionTime"] = startTime
- builder["endTime"] = stopTime
- builder["maxCores"] = numCpus
- builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong()
-
- logger.info { "Selecting VM $id" }
-
- writer.write(builder.build())
- selectedVms.add(id)
- }
-
- return selectedVms
- }
-
- /**
- * Convert the resource states table for the trace.
- */
- private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int {
- val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
-
- var hasNextRow = reader.nextRow()
- var count = 0
-
- while (hasNextRow) {
- var lastTimestamp = Long.MIN_VALUE
-
- do {
- val id = reader.get(RESOURCE_STATE_ID)
-
- if (id !in selectedVms) {
- hasNextRow = reader.nextRow()
- continue
- }
-
- val builder = GenericRecordBuilder(BP_RESOURCE_STATES_SCHEMA)
- builder["id"] = id
-
- val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
- if (lastTimestamp < 0) {
- lastTimestamp = timestamp - 5 * 60 * 1000L
- }
-
- val duration = timestamp - lastTimestamp
- val cores = reader.getInt(RESOURCE_STATE_NCPUS)
- val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
- val flops = (cpuUsage * duration / 1000.0).roundToLong()
-
- builder["time"] = timestamp
- builder["duration"] = duration
- builder["cores"] = cores
- builder["cpuUsage"] = cpuUsage
- builder["flops"] = flops
-
- writer.write(builder.build())
-
- lastTimestamp = timestamp
- hasNextRow = reader.nextRow()
- } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
-
- count++
- }
-
- return count
- }
-
- /**
- * Options for sampling the workload trace.
- */
- private class SamplingOptions : OptionGroup() {
- /**
- * The fraction of VMs to sample
- */
- val fraction by option("--sampling-fraction", help = "fraction of the workload to sample")
- .double()
- .restrictTo(0.0001, 1.0)
- .required()
-
- /**
- * The seed for sampling the trace.
- */
- val seed by option("--sampling-seed", help = "seed for sampling the workload")
- .long()
- .default(0)
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt
deleted file mode 100644
index 303a6a8c..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import java.util.UUID
-
-/**
- * An entry in a workload trace.
- *
- * @param uid The unique identifier of the entry.
- * @param name The name of the entry.
- * @param start The start time of the workload.
- * @param workload The workload of the entry.
- * @param meta The meta-data associated with the workload.
- */
-public data class TraceEntry<out T>(
- val uid: UUID,
- val name: String,
- val start: Long,
- val workload: T,
- val meta: Map<String, Any>
-)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt
deleted file mode 100644
index 08304edc..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-/**
- * An interface for reading workloads into memory.
- *
- * This interface must guarantee that the entries are delivered in order of submission time.
- *
- * @param T The shape of the workloads supported by this reader.
- */
-public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
index cb32ce88..b42951df 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -23,6 +23,7 @@
package org.opendc.experiments.capelin.trace
import mu.KotlinLogging
+import org.opendc.compute.workload.trace.TraceEntry
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.SamplingStrategy
import org.opendc.experiments.capelin.model.Workload
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt
deleted file mode 100644
index f98f4b2c..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.azure
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.*
-import java.nio.file.Files
-import java.nio.file.Path
-import java.util.stream.Collectors
-import kotlin.io.path.extension
-import kotlin.io.path.nameWithoutExtension
-
-/**
- * The resource state [Table] for the Azure v1 VM traces.
- */
-internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table {
- /**
- * The partitions that belong to the table.
- */
- private val partitions = Files.walk(path, 1)
- .filter { !Files.isDirectory(it) && it.extension == "csv" }
- .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
- .toSortedMap()
-
- override val name: String = TABLE_RESOURCE_STATES
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_STATE_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_CPU_USAGE_PCT
- )
-
- override fun newReader(): TableReader {
- val it = partitions.iterator()
-
- return object : TableReader {
- var delegate: TableReader? = nextDelegate()
-
- override fun nextRow(): Boolean {
- var delegate = delegate
-
- while (delegate != null) {
- if (delegate.nextRow()) {
- break
- }
-
- delegate.close()
- delegate = nextDelegate()
- }
-
- this.delegate = delegate
- return delegate != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
-
- override fun <T> get(column: TableColumn<T>): T {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.get(column)
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getBoolean(column)
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getInt(column)
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getLong(column)
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getDouble(column)
- }
-
- override fun close() {
- delegate?.close()
- }
-
- private fun nextDelegate(): TableReader? {
- return if (it.hasNext()) {
- val (_, path) = it.next()
- return AzureResourceStateTableReader(factory.createParser(path.toFile()))
- } else {
- null
- }
- }
-
- override fun toString(): String = "AzureCompositeTableReader"
- }
- }
-
- override fun newReader(partition: String): TableReader {
- val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
- return AzureResourceStateTableReader(factory.createParser(path.toFile()))
- }
-
- override fun toString(): String = "AzureResourceStateTable"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt
deleted file mode 100644
index f80c0e82..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.azure
-
-import com.fasterxml.jackson.core.JsonToken
-import com.fasterxml.jackson.dataformat.csv.CsvParser
-import com.fasterxml.jackson.dataformat.csv.CsvSchema
-import org.opendc.trace.*
-import java.time.Instant
-
-/**
- * A [TableReader] for the Azure v1 VM resource state table.
- */
-internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader {
- init {
- parser.schema = schema
- }
-
- override fun nextRow(): Boolean {
- reset()
-
- if (!nextStart()) {
- return false
- }
-
- while (true) {
- val token = parser.nextValue()
-
- if (token == null || token == JsonToken.END_OBJECT) {
- break
- }
-
- when (parser.currentName) {
- "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue)
- "vm id" -> id = parser.text
- "avg cpu" -> cpuUsagePct = parser.doubleValue
- }
- }
-
- return true
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- else -> false
- }
- }
-
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_STATE_ID -> id
- RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
- else -> throw IllegalArgumentException("Invalid column")
- }
-
- @Suppress("UNCHECKED_CAST")
- return res as T
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun close() {
- parser.close()
- }
-
- /**
- * Advance the parser until the next object start.
- */
- private fun nextStart(): Boolean {
- var token = parser.nextValue()
-
- while (token != null && token != JsonToken.START_OBJECT) {
- token = parser.nextValue()
- }
-
- return token != null
- }
-
- /**
- * State fields of the reader.
- */
- private var id: String? = null
- private var timestamp: Instant? = null
- private var cpuUsagePct = Double.NaN
-
- /**
- * Reset the state.
- */
- private fun reset() {
- id = null
- timestamp = null
- cpuUsagePct = Double.NaN
- }
-
- companion object {
- /**
- * The [CsvSchema] that is used to parse the trace.
- */
- private val schema = CsvSchema.builder()
- .addColumn("timestamp", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm id", CsvSchema.ColumnType.STRING)
- .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER)
- .setAllowComments(true)
- .build()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt
deleted file mode 100644
index c9d4f7eb..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.azure
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.*
-import java.nio.file.Path
-
-/**
- * The resource [Table] for the Azure v1 VM traces.
- */
-internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table {
- override val name: String = TABLE_RESOURCES
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_ID,
- RESOURCE_START_TIME,
- RESOURCE_STOP_TIME,
- RESOURCE_NCPUS,
- RESOURCE_MEM_CAPACITY
- )
-
- override fun newReader(): TableReader {
- return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile()))
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("No partition $partition")
- }
-
- override fun toString(): String = "AzureResourceTable"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt
deleted file mode 100644
index b712b854..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.azure
-
-import com.fasterxml.jackson.core.JsonToken
-import com.fasterxml.jackson.dataformat.csv.CsvParser
-import com.fasterxml.jackson.dataformat.csv.CsvSchema
-import org.apache.parquet.example.Paper.schema
-import org.opendc.trace.*
-import java.time.Instant
-
-/**
- * A [TableReader] for the Azure v1 VM resources table.
- */
-internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader {
- init {
- parser.schema = schema
- }
-
- override fun nextRow(): Boolean {
- reset()
-
- if (!nextStart()) {
- return false
- }
-
- while (true) {
- val token = parser.nextValue()
-
- if (token == null || token == JsonToken.END_OBJECT) {
- break
- }
-
- when (parser.currentName) {
- "vm id" -> id = parser.text
- "vm created" -> startTime = Instant.ofEpochSecond(parser.longValue)
- "vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue)
- "vm virtual core count" -> cpuCores = parser.intValue
- "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB
- }
- }
-
- return true
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_START_TIME -> true
- RESOURCE_STOP_TIME -> true
- RESOURCE_NCPUS -> true
- RESOURCE_MEM_CAPACITY -> true
- else -> false
- }
- }
-
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_ID -> id
- RESOURCE_START_TIME -> startTime
- RESOURCE_STOP_TIME -> stopTime
- RESOURCE_NCPUS -> getInt(RESOURCE_STATE_NCPUS)
- RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY)
- else -> throw IllegalArgumentException("Invalid column")
- }
-
- @Suppress("UNCHECKED_CAST")
- return res as T
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- RESOURCE_NCPUS -> cpuCores
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_MEM_CAPACITY -> memCapacity
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun close() {
- parser.close()
- }
-
- /**
- * Advance the parser until the next object start.
- */
- private fun nextStart(): Boolean {
- var token = parser.nextValue()
-
- while (token != null && token != JsonToken.START_OBJECT) {
- token = parser.nextValue()
- }
-
- return token != null
- }
-
- /**
- * State fields of the reader.
- */
- private var id: String? = null
- private var startTime: Instant? = null
- private var stopTime: Instant? = null
- private var cpuCores = -1
- private var memCapacity = Double.NaN
-
- /**
- * Reset the state.
- */
- fun reset() {
- id = null
- startTime = null
- stopTime = null
- cpuCores = -1
- memCapacity = Double.NaN
- }
-
- companion object {
- /**
- * The [CsvSchema] that is used to parse the trace.
- */
- private val schema = CsvSchema.builder()
- .addColumn("vm id", CsvSchema.ColumnType.NUMBER)
- .addColumn("subscription id", CsvSchema.ColumnType.STRING)
- .addColumn("deployment id", CsvSchema.ColumnType.NUMBER)
- .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER)
- .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER)
- .addColumn("max cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm category", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm memory", CsvSchema.ColumnType.NUMBER)
- .setAllowComments(true)
- .build()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt
deleted file mode 100644
index 24c60bab..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.azure
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.*
-import java.nio.file.Path
-
-/**
- * [Trace] implementation for the Azure v1 VM traces.
- */
-class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
-
- override fun containsTable(name: String): Boolean = name in tables
-
- override fun getTable(name: String): Table? {
- return when (name) {
- TABLE_RESOURCES -> AzureResourceTable(factory, path)
- TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path)
- else -> null
- }
- }
-
- override fun toString(): String = "AzureTrace[$path]"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt
deleted file mode 100644
index 744e43a0..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.azure
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import com.fasterxml.jackson.dataformat.csv.CsvParser
-import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
-
-/**
- * A format implementation for the Azure v1 format.
- */
-class AzureTraceFormat : TraceFormat {
- /**
- * The name of this trace format.
- */
- override val name: String = "azure-v1"
-
- /**
- * The [CsvFactory] used to create the parser.
- */
- private val factory = CsvFactory()
- .enable(CsvParser.Feature.ALLOW_COMMENTS)
- .enable(CsvParser.Feature.TRIM_SPACES)
-
- /**
- * Open the trace file.
- */
- override fun open(url: URL): AzureTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return AzureTrace(factory, path)
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
deleted file mode 100644
index f051bf88..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.apache.avro.generic.GenericRecord
-import org.opendc.trace.*
-import org.opendc.trace.util.parquet.LocalParquetReader
-import java.nio.file.Path
-
-/**
- * The resource state [Table] in the Bitbrains Parquet format.
- */
-internal class BPResourceStateTable(private val path: Path) : Table {
- override val name: String = TABLE_RESOURCE_STATES
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_STATE_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_DURATION,
- RESOURCE_STATE_NCPUS,
- RESOURCE_STATE_CPU_USAGE,
- )
-
- override fun newReader(): TableReader {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
- return BPResourceStateTableReader(reader)
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("Unknown partition $partition")
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt
deleted file mode 100644
index 0e7ee555..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.apache.avro.generic.GenericRecord
-import org.opendc.trace.*
-import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Duration
-import java.time.Instant
-
-/**
- * A [TableReader] implementation for the Bitbrains Parquet format.
- */
-internal class BPResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
- /**
- * The current record.
- */
- private var record: GenericRecord? = null
-
- override fun nextRow(): Boolean {
- record = reader.read()
- return record != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_DURATION -> true
- RESOURCE_STATE_NCPUS -> true
- RESOURCE_STATE_CPU_USAGE -> true
- else -> false
- }
- }
-
- override fun <T> get(column: TableColumn<T>): T {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- @Suppress("UNCHECKED_CAST")
- val res: Any = when (column) {
- RESOURCE_STATE_ID -> record["id"].toString()
- RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long)
- RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long)
- RESOURCE_STATE_NCPUS -> record["cores"]
- RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column")
- }
-
- @Suppress("UNCHECKED_CAST")
- return res as T
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (column) {
- RESOURCE_STATE_NCPUS -> record["cores"] as Int
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun close() {
- reader.close()
- }
-
- override fun toString(): String = "BPResourceStateTableReader"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
deleted file mode 100644
index 5b0f013f..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.apache.avro.generic.GenericRecord
-import org.opendc.trace.*
-import org.opendc.trace.util.parquet.LocalParquetReader
-import java.nio.file.Path
-
-/**
- * The resource [Table] in the Bitbrains Parquet format.
- */
-internal class BPResourceTable(private val path: Path) : Table {
- override val name: String = TABLE_RESOURCES
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_ID,
- RESOURCE_START_TIME,
- RESOURCE_STOP_TIME,
- RESOURCE_NCPUS,
- RESOURCE_MEM_CAPACITY
- )
-
- override fun newReader(): TableReader {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
- return BPResourceTableReader(reader)
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("Unknown partition $partition")
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
deleted file mode 100644
index 4416aae8..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.apache.avro.generic.GenericRecord
-import org.opendc.trace.*
-import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Instant
-
-/**
- * A [TableReader] implementation for the Bitbrains Parquet format.
- */
-internal class BPResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
- /**
- * The current record.
- */
- private var record: GenericRecord? = null
-
- override fun nextRow(): Boolean {
- record = reader.read()
- return record != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_START_TIME -> true
- RESOURCE_STOP_TIME -> true
- RESOURCE_NCPUS -> true
- RESOURCE_MEM_CAPACITY -> true
- else -> false
- }
- }
-
- override fun <T> get(column: TableColumn<T>): T {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- @Suppress("UNCHECKED_CAST")
- val res: Any = when (column) {
- RESOURCE_ID -> record["id"].toString()
- RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long)
- RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long)
- RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS)
- RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
- else -> throw IllegalArgumentException("Invalid column")
- }
-
- @Suppress("UNCHECKED_CAST")
- return res as T
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (column) {
- RESOURCE_NCPUS -> record["maxCores"] as Int
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (column) {
- RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun close() {
- reader.close()
- }
-
- override fun toString(): String = "BPResourceTableReader"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt
deleted file mode 100644
index 486587b1..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.opendc.trace.TABLE_RESOURCES
-import org.opendc.trace.TABLE_RESOURCE_STATES
-import org.opendc.trace.Table
-import org.opendc.trace.Trace
-import java.nio.file.Path
-
-/**
- * A [Trace] in the Bitbrains Parquet format.
- */
-public class BPTrace internal constructor(private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
-
- override fun containsTable(name: String): Boolean =
- name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES
-
- override fun getTable(name: String): Table? {
- return when (name) {
- TABLE_RESOURCES -> BPResourceTable(path)
- TABLE_RESOURCE_STATES -> BPResourceStateTable(path)
- else -> null
- }
- }
-
- override fun toString(): String = "BPTrace[$path]"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt
deleted file mode 100644
index 49d5b4c5..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
-
-/**
- * A format implementation for the GWF trace format.
- */
-public class BPTraceFormat : TraceFormat {
- /**
- * The name of this trace format.
- */
- override val name: String = "bitbrains-parquet"
-
- /**
- * Open a Bitbrains Parquet trace.
- */
- override fun open(url: URL): BPTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return BPTrace(path)
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt
deleted file mode 100644
index 7dd8161d..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-
-/**
- * Schema for the resources table in the trace.
- */
-val BP_RESOURCES_SCHEMA: Schema = SchemaBuilder
- .record("meta")
- .namespace("org.opendc.trace.capelin")
- .fields()
- .requiredString("id")
- .requiredLong("submissionTime")
- .requiredLong("endTime")
- .requiredInt("maxCores")
- .requiredLong("requiredMemory")
- .endRecord()
-
-/**
- * Schema for the resource states table in the trace.
- */
-val BP_RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder
- .record("meta")
- .namespace("org.opendc.trace.capelin")
- .fields()
- .requiredString("id")
- .requiredLong("time")
- .requiredLong("duration")
- .requiredInt("cores")
- .requiredDouble("cpuUsage")
- .requiredLong("flops")
- .endRecord()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
deleted file mode 100644
index 67140fe9..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.sv
-
-import org.opendc.trace.*
-import java.nio.file.Files
-import java.nio.file.Path
-import java.util.stream.Collectors
-import kotlin.io.path.bufferedReader
-import kotlin.io.path.extension
-import kotlin.io.path.nameWithoutExtension
-
-/**
- * The resource state [Table] in the extended Bitbrains format.
- */
-internal class SvResourceStateTable(path: Path) : Table {
- /**
- * The partitions that belong to the table.
- */
- private val partitions = Files.walk(path, 1)
- .filter { !Files.isDirectory(it) && it.extension == "txt" }
- .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
- .toSortedMap()
-
- override val name: String = TABLE_RESOURCE_STATES
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_STATE_ID,
- RESOURCE_STATE_CLUSTER_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_NCPUS,
- RESOURCE_STATE_CPU_CAPACITY,
- RESOURCE_STATE_CPU_USAGE,
- RESOURCE_STATE_CPU_USAGE_PCT,
- RESOURCE_STATE_CPU_DEMAND,
- RESOURCE_STATE_CPU_READY_PCT,
- RESOURCE_STATE_MEM_CAPACITY,
- RESOURCE_STATE_DISK_READ,
- RESOURCE_STATE_DISK_WRITE,
- )
-
- override fun newReader(): TableReader {
- val it = partitions.iterator()
-
- return object : TableReader {
- var delegate: TableReader? = nextDelegate()
-
- override fun nextRow(): Boolean {
- var delegate = delegate
-
- while (delegate != null) {
- if (delegate.nextRow()) {
- break
- }
-
- delegate.close()
- delegate = nextDelegate()
- }
-
- this.delegate = delegate
- return delegate != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
-
- override fun <T> get(column: TableColumn<T>): T {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.get(column)
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getBoolean(column)
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getInt(column)
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getLong(column)
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getDouble(column)
- }
-
- override fun close() {
- delegate?.close()
- }
-
- private fun nextDelegate(): TableReader? {
- return if (it.hasNext()) {
- val (_, path) = it.next()
- val reader = path.bufferedReader()
- return SvResourceStateTableReader(reader)
- } else {
- null
- }
- }
-
- override fun toString(): String = "SvCompositeTableReader"
- }
- }
-
- override fun newReader(partition: String): TableReader {
- val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
- val reader = path.bufferedReader()
- return SvResourceStateTableReader(reader)
- }
-
- override fun toString(): String = "SvResourceStateTable"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
deleted file mode 100644
index 6ea403fe..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.sv
-
-import org.opendc.trace.*
-import java.io.BufferedReader
-import java.time.Instant
-
-/**
- * A [TableReader] for the Bitbrains resource state table.
- */
-internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader {
- override fun nextRow(): Boolean {
- reset()
-
- var line: String
- var num = 0
-
- while (true) {
- line = reader.readLine() ?: return false
- num++
-
- if (line[0] == '#' || line.isBlank()) {
- // Ignore empty lines or comments
- continue
- }
-
- break
- }
-
- line = line.trim()
-
- val length = line.length
- var col = 0
- var start: Int
- var end = 0
-
- while (end < length) {
- // Trim all whitespace before the field
- start = end
- while (start < length && line[start].isWhitespace()) {
- start++
- }
-
- end = line.indexOf(' ', start)
-
- if (end < 0) {
- end = length
- }
-
- val field = line.subSequence(start, end) as String
- when (col++) {
- COL_TIMESTAMP -> timestamp = Instant.ofEpochSecond(field.toLong(10))
- COL_CPU_USAGE -> cpuUsage = field.toDouble()
- COL_CPU_DEMAND -> cpuDemand = field.toDouble()
- COL_DISK_READ -> diskRead = field.toDouble()
- COL_DISK_WRITE -> diskWrite = field.toDouble()
- COL_CLUSTER_ID -> cluster = field.trim()
- COL_NCPUS -> cpuCores = field.toInt(10)
- COL_CPU_READY_PCT -> cpuReadyPct = field.toDouble()
- COL_POWERED_ON -> poweredOn = field.toInt(10) == 1
- COL_CPU_CAPACITY -> cpuCapacity = field.toDouble()
- COL_ID -> id = field.trim()
- COL_MEM_CAPACITY -> memCapacity = field.toDouble()
- }
- }
-
- return true
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_CLUSTER_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_NCPUS -> true
- RESOURCE_STATE_CPU_CAPACITY -> true
- RESOURCE_STATE_CPU_USAGE -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- RESOURCE_STATE_CPU_DEMAND -> true
- RESOURCE_STATE_CPU_READY_PCT -> true
- RESOURCE_STATE_MEM_CAPACITY -> true
- RESOURCE_STATE_DISK_READ -> true
- RESOURCE_STATE_DISK_WRITE -> true
- else -> false
- }
- }
-
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_STATE_ID -> id
- RESOURCE_STATE_CLUSTER_ID -> cluster
- RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS)
- RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY)
- RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
- RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT)
- RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY)
- RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ)
- RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE)
- else -> throw IllegalArgumentException("Invalid column")
- }
-
- @Suppress("UNCHECKED_CAST")
- return res as T
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- return when (column) {
- RESOURCE_STATE_POWERED_ON -> poweredOn
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- RESOURCE_STATE_NCPUS -> cpuCores
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity
- RESOURCE_STATE_CPU_DEMAND -> cpuDemand
- RESOURCE_STATE_MEM_CAPACITY -> memCapacity
- RESOURCE_STATE_DISK_READ -> diskRead
- RESOURCE_STATE_DISK_WRITE -> diskWrite
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun close() {
- reader.close()
- }
-
- /**
- * State fields of the reader.
- */
- private var id: String? = null
- private var cluster: String? = null
- private var timestamp: Instant? = null
- private var cpuCores = -1
- private var cpuCapacity = Double.NaN
- private var cpuUsage = Double.NaN
- private var cpuDemand = Double.NaN
- private var cpuReadyPct = Double.NaN
- private var memCapacity = Double.NaN
- private var diskRead = Double.NaN
- private var diskWrite = Double.NaN
- private var poweredOn: Boolean = false
-
- /**
- * Reset the state of the reader.
- */
- private fun reset() {
- id = null
- timestamp = null
- cluster = null
- cpuCores = -1
- cpuCapacity = Double.NaN
- cpuUsage = Double.NaN
- cpuDemand = Double.NaN
- cpuReadyPct = Double.NaN
- memCapacity = Double.NaN
- diskRead = Double.NaN
- diskWrite = Double.NaN
- poweredOn = false
- }
-
- /**
- * Default column indices for the extended Bitbrains format.
- */
- private val COL_TIMESTAMP = 0
- private val COL_CPU_USAGE = 1
- private val COL_CPU_DEMAND = 2
- private val COL_DISK_READ = 4
- private val COL_DISK_WRITE = 6
- private val COL_CLUSTER_ID = 10
- private val COL_NCPUS = 12
- private val COL_CPU_READY_PCT = 13
- private val COL_POWERED_ON = 14
- private val COL_CPU_CAPACITY = 18
- private val COL_ID = 19
- private val COL_MEM_CAPACITY = 20
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt
deleted file mode 100644
index dbd63de5..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.sv
-
-import org.opendc.trace.*
-import java.nio.file.Path
-
-/**
- * [Trace] implementation for the extended Bitbrains format.
- */
-public class SvTrace internal constructor(private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_RESOURCE_STATES)
-
- override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name
-
- override fun getTable(name: String): Table? {
- if (!containsTable(name)) {
- return null
- }
-
- return SvResourceStateTable(path)
- }
-
- override fun toString(): String = "SvTrace[$path]"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt
deleted file mode 100644
index 0cce8559..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.sv
-
-import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
-
-/**
- * A format implementation for the extended Bitbrains trace format.
- */
-public class SvTraceFormat : TraceFormat {
- /**
- * The name of this trace format.
- */
- override val name: String = "sv"
-
- /**
- * Open the trace file.
- */
- override fun open(url: URL): SvTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return SvTrace(path)
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt
deleted file mode 100644
index 065a8c93..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.util
-
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import io.opentelemetry.sdk.resources.Resource
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.compute.simulator.SimHost
-import org.opendc.experiments.capelin.env.MachineDef
-import org.opendc.experiments.capelin.trace.TraceReader
-import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.kernel.SimHypervisorProvider
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.power.SimplePowerDriver
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.telemetry.compute.*
-import org.opendc.telemetry.sdk.toOtelClock
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
-import kotlin.math.max
-
-/**
- * Helper class to manage a [ComputeService] simulation.
- */
-class ComputeServiceSimulator(
- private val context: CoroutineContext,
- private val clock: Clock,
- scheduler: ComputeScheduler,
- machines: List<MachineDef>,
- private val failureModel: FailureModel? = null,
- interferenceModel: VmInterferenceModel? = null,
- hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider()
-) : AutoCloseable {
- /**
- * The [ComputeService] that has been configured by the manager.
- */
- val service: ComputeService
-
- /**
- * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts.
- */
- val producers: List<MetricProducer>
- get() = _metricProducers
- private val _metricProducers = mutableListOf<MetricProducer>()
-
- /**
- * The [SimResourceInterpreter] to simulate the hosts.
- */
- private val interpreter = SimResourceInterpreter(context, clock)
-
- /**
- * The hosts that belong to this class.
- */
- private val hosts = mutableSetOf<SimHost>()
-
- init {
- val (service, serviceMeterProvider) = createService(scheduler)
- this._metricProducers.add(serviceMeterProvider)
- this.service = service
-
- for (def in machines) {
- val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel)
- this._metricProducers.add(hostMeterProvider)
- hosts.add(host)
- this.service.addHost(host)
- }
- }
-
- /**
- * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader].
- */
- suspend fun run(reader: TraceReader<SimWorkload>) {
- val injector = failureModel?.createInjector(context, clock, service)
- val client = service.newClient()
-
- // Create new image for the virtual machine
- val image = client.newImage("vm-image")
-
- try {
- coroutineScope {
- // Start the fault injector
- injector?.start()
-
- var offset = Long.MIN_VALUE
-
- while (reader.hasNext()) {
- val entry = reader.next()
-
- if (offset < 0) {
- offset = entry.start - clock.millis()
- }
-
- // Make sure the trace entries are ordered by submission time
- assert(entry.start - offset >= 0) { "Invalid trace order" }
- delay(max(0, (entry.start - offset) - clock.millis()))
-
- launch {
- val workloadOffset = -offset + 300001
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
-
- val server = client.newServer(
- entry.name,
- image,
- client.newFlavor(
- entry.name,
- entry.meta["cores"] as Int,
- entry.meta["required-memory"] as Long
- ),
- meta = entry.meta + mapOf("workload" to workload)
- )
-
- // Wait for the server reach its end time
- val endTime = entry.meta["end-time"] as Long
- delay(endTime + workloadOffset - clock.millis() + 1)
-
- // Delete the server after reaching the end-time of the virtual machine
- server.delete()
- }
- }
- }
-
- yield()
- } finally {
- injector?.close()
- reader.close()
- client.close()
- }
- }
-
- override fun close() {
- service.close()
-
- for (host in hosts) {
- host.close()
- }
-
- hosts.clear()
- }
-
- /**
- * Construct a [ComputeService] instance.
- */
- private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> {
- val resource = Resource.builder()
- .put(ResourceAttributes.SERVICE_NAME, "opendc-compute")
- .build()
-
- val meterProvider = SdkMeterProvider.builder()
- .setClock(clock.toOtelClock())
- .setResource(resource)
- .build()
-
- val service = ComputeService(context, clock, meterProvider, scheduler)
- return service to meterProvider
- }
-
- /**
- * Construct a [SimHost] instance for the specified [MachineDef].
- */
- private fun createHost(
- def: MachineDef,
- hypervisorProvider: SimHypervisorProvider,
- interferenceModel: VmInterferenceModel? = null
- ): Pair<SimHost, SdkMeterProvider> {
- val resource = Resource.builder()
- .put(HOST_ID, def.uid.toString())
- .put(HOST_NAME, def.name)
- .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
- .put(HOST_NCPUS, def.model.cpus.size)
- .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size })
- .build()
-
- val meterProvider = SdkMeterProvider.builder()
- .setClock(clock.toOtelClock())
- .setResource(resource)
- .build()
-
- val host = SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- context,
- interpreter,
- meterProvider,
- hypervisorProvider,
- powerDriver = SimplePowerDriver(def.powerModel),
- interferenceDomain = interferenceModel?.newDomain()
- )
-
- return host to meterProvider
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt
deleted file mode 100644
index 83393896..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.util
-
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.simulator.failure.HostFaultInjector
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
-
-/**
- * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts.
- */
-interface FailureModel {
- /**
- * Construct a [HostFaultInjector] for the specified [service].
- */
- fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt
deleted file mode 100644
index 89b4a31c..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("FailureModels")
-package org.opendc.experiments.capelin
-
-import org.apache.commons.math3.distribution.LogNormalDistribution
-import org.apache.commons.math3.random.Well19937c
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.simulator.SimHost
-import org.opendc.compute.simulator.failure.HostFaultInjector
-import org.opendc.compute.simulator.failure.StartStopHostFault
-import org.opendc.compute.simulator.failure.StochasticVictimSelector
-import org.opendc.experiments.capelin.util.FailureModel
-import java.time.Clock
-import java.time.Duration
-import kotlin.coroutines.CoroutineContext
-import kotlin.math.ln
-import kotlin.random.Random
-
-/**
- * Obtain a [FailureModel] based on the GRID'5000 failure trace.
- *
- * This fault injector uses parameters from the GRID'5000 failure trace as described in
- * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
- */
-fun grid5000(failureInterval: Duration, seed: Int): FailureModel {
- return object : FailureModel {
- override fun createInjector(
- context: CoroutineContext,
- clock: Clock,
- service: ComputeService
- ): HostFaultInjector {
- val rng = Well19937c(seed)
- val hosts = service.hosts.map { it as SimHost }.toSet()
-
- // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- // GRID'5000
- return HostFaultInjector(
- context,
- clock,
- hosts,
- iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03),
- selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
- fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
- )
- }
-
- override fun toString(): String = "Grid5000FailureModel"
- }
-}
-
-/**
- * Obtain the [HostFaultInjector] to use for the experiments.
- *
- * This fault injector uses parameters from the GRID'5000 failure trace as described in
- * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
- */
-fun createFaultInjector(
- context: CoroutineContext,
- clock: Clock,
- hosts: Set<SimHost>,
- seed: Int,
- failureInterval: Double
-): HostFaultInjector {
- val rng = Well19937c(seed)
-
- // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- // GRID'5000
- return HostFaultInjector(
- context,
- clock,
- hosts,
- iat = LogNormalDistribution(rng, ln(failureInterval), 1.03),
- selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
- fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
- )
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt
index b55bd577..67de2777 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace
+package org.opendc.experiments.capelin.util
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
index d1c01b8e..d46b50c3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
@@ -36,7 +36,7 @@
<Logger name="org.opendc.experiments.capelin" level="info" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
- <Logger name="org.opendc.experiments.capelin.trace" level="debug" additivity="false">
+ <Logger name="org.opendc.experiments.vm.trace" level="debug" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.apache.hadoop" level="warn" additivity="false">
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 727530e3..b0f86346 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -31,14 +31,15 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.workload.ComputeWorkloadRunner
+import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.trace.RawParquetTraceReader
+import org.opendc.compute.workload.trace.TraceReader
+import org.opendc.compute.workload.util.PerformanceInterferenceReader
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.ParquetTraceReader
-import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.trace.RawParquetTraceReader
-import org.opendc.experiments.capelin.trace.TraceReader
-import org.opendc.experiments.capelin.util.ComputeServiceSimulator
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
@@ -85,7 +86,7 @@ class CapelinIntegrationTest {
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- val simulator = ComputeServiceSimulator(
+ val simulator = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler,
@@ -134,7 +135,7 @@ class CapelinIntegrationTest {
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
- val simulator = ComputeServiceSimulator(
+ val simulator = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler,
@@ -184,7 +185,7 @@ class CapelinIntegrationTest {
.read(perfInterferenceInput)
.let { VmInterferenceModel(it, Random(seed.toLong())) }
- val simulator = ComputeServiceSimulator(
+ val simulator = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler,
@@ -229,7 +230,7 @@ class CapelinIntegrationTest {
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
- val simulator = ComputeServiceSimulator(
+ val simulator = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler,
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt
deleted file mode 100644
index fbc39b87..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
-
-/**
- * Test suite for the [PerformanceInterferenceReader] class.
- */
-class PerformanceInterferenceReaderTest {
- @Test
- fun testSmoke() {
- val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json"))
- val result = PerformanceInterferenceReader().read(input)
-
- assertAll(
- { assertEquals(2, result.size) },
- { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) },
- { assertEquals(0.0, result[0].targetLoad, 0.001) },
- { assertEquals(0.8830158730158756, result[0].score, 0.001) }
- )
- }
-}