summaryrefslogtreecommitdiff
path: root/simulator/opendc-experiments/opendc-experiments-sc20
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-sc20')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts56
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt158
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt78
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt244
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt90
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt222
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt161
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt48
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt33
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt30
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt42
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt75
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt204
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt89
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt68
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt81
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt51
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt32
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt45
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt48
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt42
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt58
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt82
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt62
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt35
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt45
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt39
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt35
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt43
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt128
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt83
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt67
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt80
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt98
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt174
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt305
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt621
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt214
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml52
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt252
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/single.txt3
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/topology.txt5
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/meta.parquetbin0 -> 2148 bytes
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/trace.parquetbin0 -> 1672463 bytes
44 files changed, 4378 insertions, 0 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts
new file mode 100644
index 00000000..0a7b663c
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2019 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Experiments for the SC20 paper"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-convention`
+ application
+}
+
+application {
+ mainClassName = "com.atlarge.opendc.experiments.sc20.MainKt"
+ applicationDefaultJvmArgs = listOf("-Xms2500M")
+}
+
+dependencies {
+ api(project(":opendc-core"))
+ implementation(project(":opendc-format"))
+ implementation(project(":opendc-simulator"))
+
+ implementation("com.github.ajalt:clikt:2.6.0")
+ implementation("me.tongfei:progressbar:0.8.1")
+ implementation("io.github.microutils:kotlin-logging:1.7.9")
+
+ implementation("org.apache.parquet:parquet-avro:1.11.0")
+ implementation("org.apache.hadoop:hadoop-client:3.2.1") {
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ exclude(group = "log4j")
+ }
+
+ runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1")
+
+ testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
+ testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
+ testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}")
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
new file mode 100644
index 00000000..cd85351e
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
@@ -0,0 +1,158 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.opendc.experiments.sc20.experiment.*
+import com.atlarge.opendc.experiments.sc20.reporter.ConsoleExperimentReporter
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.execution.ThreadPoolExperimentScheduler
+import com.atlarge.opendc.experiments.sc20.runner.internal.DefaultExperimentRunner
+import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
+import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.options.convert
+import com.github.ajalt.clikt.parameters.options.default
+import com.github.ajalt.clikt.parameters.options.defaultLazy
+import com.github.ajalt.clikt.parameters.options.multiple
+import com.github.ajalt.clikt.parameters.options.option
+import com.github.ajalt.clikt.parameters.options.required
+import com.github.ajalt.clikt.parameters.types.choice
+import com.github.ajalt.clikt.parameters.types.file
+import com.github.ajalt.clikt.parameters.types.int
+import mu.KotlinLogging
+import java.io.File
+import java.io.InputStream
+
+/**
+ * The logger for this experiment.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * Represents the command for running the experiment.
+ */
+class ExperimentCli : CliktCommand(name = "sc20-experiment", help = "Run experiments from the Capelin paper") {
+ /**
+ * The path to the directory where the topology descriptions are located.
+ */
+ private val environmentPath by option("--environment-path", help = "path to the environment directory")
+ .file(canBeFile = false)
+ .required()
+
+ /**
+ * The path to the directory where the traces are located.
+ */
+ private val tracePath by option("--trace-path", help = "path to the traces directory")
+ .file(canBeFile = false)
+ .required()
+
+ /**
+ * The path to the performance interference model.
+ */
+ private val performanceInterferenceStream by option("--performance-interference-model", help = "path to the performance interference file")
+ .file(canBeDir = false)
+ .convert { it.inputStream() as InputStream }
+
+ /**
+ * The path to the original VM placements file.
+ */
+ private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file")
+ .file(canBeDir = false)
+ .convert {
+ Sc20VmPlacementReader(it.inputStream().buffered()).construct()
+ }
+ .default(emptyMap())
+
+ /**
+ * The selected portfolios to run.
+ */
+ private val portfolios by option("--portfolio", help = "portfolio of scenarios to explore")
+ .choice(
+ "hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) }
+ as (Experiment, Int) -> Portfolio,
+ "more-velocity" to { experiment, i -> MoreVelocityPortfolio(experiment, i) },
+ "composite-workload" to { experiment, i -> CompositeWorkloadPortfolio(experiment, i) },
+ "operational-phenomena" to { experiment, i -> OperationalPhenomenaPortfolio(experiment, i) },
+ "replay" to { experiment, i -> ReplayPortfolio(experiment, i) },
+ "test" to { experiment, i -> TestPortfolio(experiment, i) },
+ "more-hpc" to { experiment, i -> MoreHpcPortfolio(experiment, i) },
+ ignoreCase = true
+ )
+ .multiple(required = true)
+
+ /**
+ * The maximum number of worker threads to use.
+ */
+ private val parallelism by option("--parallelism", help = "maximum number of concurrent simulation runs")
+ .int()
+ .default(Runtime.getRuntime().availableProcessors())
+
+ /**
+ * The buffer size for writing results.
+ */
+ private val bufferSize by option("--buffer-size")
+ .int()
+ .default(4096)
+
+ /**
+ * The path to the output directory.
+ */
+ private val output by option("-O", "--output", help = "path to the output directory")
+ .file(canBeFile = false)
+ .defaultLazy { File("data") }
+
+ override fun run() {
+ logger.info { "Constructing performance interference model" }
+
+ val performanceInterferenceModel =
+ performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it) }
+
+ logger.info { "Creating experiment descriptor" }
+ val descriptor = object : Experiment(environmentPath, tracePath, output, performanceInterferenceModel, vmPlacements, bufferSize) {
+ private val descriptor = this
+ override val children: Sequence<ExperimentDescriptor> = sequence {
+ for ((i, producer) in portfolios.withIndex()) {
+ yield(producer(descriptor, i))
+ }
+ }
+ }
+
+ logger.info { "Starting experiment runner [parallelism=$parallelism]" }
+ val scheduler = ThreadPoolExperimentScheduler(parallelism)
+ val runner = DefaultExperimentRunner(scheduler)
+ val reporter = ConsoleExperimentReporter()
+ try {
+ runner.execute(descriptor, reporter)
+ } finally {
+ scheduler.close()
+ reporter.close()
+ }
+ }
+}
+
+/**
+ * Main entry point of the experiment.
+ */
+fun main(args: Array<String>) = ExperimentCli().main(args)
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt
new file mode 100644
index 00000000..f3ac2554
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt
@@ -0,0 +1,78 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment
+
+import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
+import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent
+import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetRunEventWriter
+import com.atlarge.opendc.format.trace.PerformanceInterferenceModelReader
+import java.io.File
+
+/**
+ * The global configuration of the experiment.
+ *
+ * @param environments The path to the topologies directory.
+ * @param traces The path to the traces directory.
+ * @param output The output directory.
+ * @param performanceInterferenceModel The optional performance interference model that has been specified.
+ * @param vmPlacements Original VM placement in the trace.
+ * @param bufferSize The buffer size of the event reporters.
+ */
+public abstract class Experiment(
+ val environments: File,
+ val traces: File,
+ val output: File,
+ val performanceInterferenceModel: PerformanceInterferenceModelReader?,
+ val vmPlacements: Map<String, String>,
+ val bufferSize: Int
+) : ContainerExperimentDescriptor() {
+ override val parent: ExperimentDescriptor? = null
+
+ override suspend fun invoke(context: ExperimentExecutionContext) {
+ val writer = ParquetRunEventWriter(File(output, "experiments.parquet"), bufferSize)
+ try {
+ val listener = object : ExperimentExecutionListener by context.listener {
+ override fun descriptorRegistered(descriptor: ExperimentDescriptor) {
+ if (descriptor is Run) {
+ writer.write(RunEvent(descriptor, System.currentTimeMillis()))
+ }
+
+ context.listener.descriptorRegistered(descriptor)
+ }
+ }
+
+ val newContext = object : ExperimentExecutionContext by context {
+ override val listener: ExperimentExecutionListener = listener
+ }
+
+ super.invoke(newContext)
+ } finally {
+ writer.close()
+ }
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
new file mode 100644
index 00000000..b68ee97e
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -0,0 +1,244 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment
+
+import com.atlarge.opendc.compute.core.Flavor
+import com.atlarge.opendc.compute.core.ServerEvent
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.compute.metal.NODE_CLUSTER
+import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
+import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.compute.virt.HypervisorEvent
+import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
+import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
+import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
+import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
+import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
+import com.atlarge.opendc.core.failure.FailureDomain
+import com.atlarge.opendc.core.failure.FaultInjector
+import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
+import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
+import com.atlarge.opendc.format.environment.EnvironmentReader
+import com.atlarge.opendc.format.trace.TraceReader
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
+import kotlinx.coroutines.flow.takeWhile
+import mu.KotlinLogging
+import java.io.File
+import java.time.Clock
+import kotlin.math.ln
+import kotlin.math.max
+import kotlin.random.Random
+
+/**
+ * The logger for this experiment.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * Construct the failure domain for the experiments.
+ */
+suspend fun createFailureDomain(
+ coroutineScope: CoroutineScope,
+ clock: Clock,
+ seed: Int,
+ failureInterval: Double,
+ bareMetalProvisioner: ProvisioningService,
+ chan: Channel<Unit>
+): CoroutineScope {
+ val job = coroutineScope.launch {
+ chan.receive()
+ val random = Random(seed)
+ val injectors = mutableMapOf<String, FaultInjector>()
+ for (node in bareMetalProvisioner.nodes()) {
+ val cluster = node.metadata[NODE_CLUSTER] as String
+ val injector =
+ injectors.getOrPut(cluster) {
+ createFaultInjector(
+ this,
+ clock,
+ random,
+ failureInterval
+ )
+ }
+ injector.enqueue(node.metadata["driver"] as FailureDomain)
+ }
+ }
+ return CoroutineScope(coroutineScope.coroutineContext + job)
+}
+
+/**
+ * Obtain the [FaultInjector] to use for the experiments.
+ */
+fun createFaultInjector(coroutineScope: CoroutineScope, clock: Clock, random: Random, failureInterval: Double): FaultInjector {
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return CorrelatedFaultInjector(
+ coroutineScope,
+ clock,
+ iatScale = ln(failureInterval), iatShape = 1.03, // Hours
+ sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
+ dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
+ random = random
+ )
+}
+
+/**
+ * Create the trace reader from which the VM workloads are read.
+ */
+fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20StreamingParquetTraceReader {
+ return Sc20StreamingParquetTraceReader(
+ path,
+ performanceInterferenceModel,
+ vms,
+ Random(seed)
+ )
+}
+
+/**
+ * Construct the environment for a VM provisioner and return the provisioner instance.
+ */
+suspend fun createProvisioner(
+ coroutineScope: CoroutineScope,
+ clock: Clock,
+ environmentReader: EnvironmentReader,
+ allocationPolicy: AllocationPolicy
+): Pair<ProvisioningService, SimpleVirtProvisioningService> {
+ val environment = environmentReader.use { it.construct(coroutineScope, clock) }
+ val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
+
+ // Wait for the bare metal nodes to be spawned
+ delay(10)
+
+ val scheduler = SimpleVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy)
+
+ // Wait for the hypervisors to be spawned
+ delay(10)
+
+ return bareMetalProvisioner to scheduler
+}
+
+/**
+ * Attach the specified monitor to the VM provisioner.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+suspend fun attachMonitor(coroutineScope: CoroutineScope, clock: Clock, scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) {
+ val hypervisors = scheduler.drivers()
+
+ // Monitor hypervisor events
+ for (hypervisor in hypervisors) {
+ // TODO Do not expose VirtDriver directly but use Hypervisor class.
+ monitor.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server)
+ hypervisor.server.events
+ .onEach { event ->
+ val time = clock.millis()
+ when (event) {
+ is ServerEvent.StateChanged -> {
+ monitor.reportHostStateChange(time, hypervisor, event.server)
+ }
+ }
+ }
+ .launchIn(coroutineScope)
+ hypervisor.events
+ .onEach { event ->
+ when (event) {
+ is HypervisorEvent.SliceFinished -> monitor.reportHostSlice(
+ clock.millis(),
+ event.requestedBurst,
+ event.grantedBurst,
+ event.overcommissionedBurst,
+ event.interferedBurst,
+ event.cpuUsage,
+ event.cpuDemand,
+ event.numberOfDeployedImages,
+ event.hostServer
+ )
+ }
+ }
+ .launchIn(coroutineScope)
+
+ val driver = hypervisor.server.services[BareMetalDriver.Key]
+ driver.powerDraw
+ .onEach { monitor.reportPowerConsumption(hypervisor.server, it) }
+ .launchIn(coroutineScope)
+ }
+
+ scheduler.events
+ .onEach { event ->
+ when (event) {
+ is VirtProvisioningEvent.MetricsAvailable ->
+ monitor.reportProvisionerMetrics(clock.millis(), event)
+ }
+ }
+ .launchIn(coroutineScope)
+}
+
+/**
+ * Process the trace.
+ */
+suspend fun processTrace(coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) {
+ try {
+ var submitted = 0
+
+ while (reader.hasNext()) {
+ val (time, workload) = reader.next()
+
+ submitted++
+ delay(max(0, time - clock.millis()))
+ coroutineScope.launch {
+ chan.send(Unit)
+ val server = scheduler.deploy(
+ workload.image.name,
+ workload.image,
+ Flavor(workload.image.maxCores, workload.image.requiredMemory)
+ )
+ // Monitor server events
+ server.events
+ .onEach {
+ if (it is ServerEvent.StateChanged) {
+ monitor.reportVmStateChange(clock.millis(), it.server)
+ }
+ }
+ .collect()
+ }
+ }
+
+ scheduler.events
+ .takeWhile {
+ when (it) {
+ is VirtProvisioningEvent.MetricsAvailable ->
+ it.inactiveVmCount + it.failedVmCount != submitted
+ }
+ }
+ .collect()
+ delay(1)
+ } finally {
+ reader.close()
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt
new file mode 100644
index 00000000..6a40f5fb
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt
@@ -0,0 +1,90 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment
+
+import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena
+import com.atlarge.opendc.experiments.sc20.experiment.model.Topology
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor
+
+/**
+ * A portfolio represents a collection of scenarios are tested.
+ */
+public abstract class Portfolio(
+ override val parent: Experiment,
+ val id: Int,
+ val name: String
+) : ContainerExperimentDescriptor() {
+ /**
+ * The topologies to consider.
+ */
+ protected abstract val topologies: List<Topology>
+
+ /**
+ * The workloads to consider.
+ */
+ protected abstract val workloads: List<Workload>
+
+ /**
+ * The operational phenomenas to consider.
+ */
+ protected abstract val operationalPhenomenas: List<OperationalPhenomena>
+
+ /**
+ * The allocation policies to consider.
+ */
+ protected abstract val allocationPolicies: List<String>
+
+ /**
+ * The number of repetitions to perform.
+ */
+ open val repetitions: Int = 32
+
+ /**
+ * Resolve the children of this container.
+ */
+ override val children: Sequence<Scenario> = sequence {
+ var id = 0
+ for (topology in topologies) {
+ for (workload in workloads) {
+ for (operationalPhenomena in operationalPhenomenas) {
+ for (allocationPolicy in allocationPolicies) {
+ yield(
+ Scenario(
+ this@Portfolio,
+ id++,
+ repetitions,
+ topology,
+ workload,
+ allocationPolicy,
+ operationalPhenomena
+ )
+ )
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt
new file mode 100644
index 00000000..09a6ce40
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt
@@ -0,0 +1,222 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment
+
+import com.atlarge.opendc.experiments.sc20.experiment.model.*
+
+public class HorVerPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "horizontal_vs_vertical") {
+ override val topologies = listOf(
+ Topology("base"),
+ Topology("rep-vol-hor-hom"),
+ Topology("rep-vol-hor-het"),
+ Topology("rep-vol-ver-hom"),
+ Topology("rep-vol-ver-het"),
+ Topology("exp-vol-hor-hom"),
+ Topology("exp-vol-hor-het"),
+ Topology("exp-vol-ver-hom"),
+ Topology("exp-vol-ver-het")
+ )
+
+ override val workloads = listOf(
+ Workload("solvinity", 0.1),
+ Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicies = listOf(
+ "active-servers"
+ )
+}
+
+public class MoreVelocityPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_velocity") {
+ override val topologies = listOf(
+ Topology("base"),
+ Topology("rep-vel-ver-hom"),
+ Topology("rep-vel-ver-het"),
+ Topology("exp-vel-ver-hom"),
+ Topology("exp-vel-ver-het")
+ )
+
+ override val workloads = listOf(
+ Workload("solvinity", 0.1),
+ Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicies = listOf(
+ "active-servers"
+ )
+}
+
+public class CompositeWorkloadPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "composite-workload") {
+ private val totalSampleLoad = 1.3301733005049648E12
+
+ override val topologies = listOf(
+ Topology("base"),
+ Topology("exp-vol-hor-hom"),
+ Topology("exp-vol-ver-hom"),
+ Topology("exp-vel-ver-hom")
+ )
+
+ override val workloads = listOf(
+ CompositeWorkload(
+ "all-azure",
+ listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "solvinity-25-azure-75",
+ listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "solvinity-50-azure-50",
+ listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "solvinity-75-azure-25",
+ listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "all-solvinity",
+ listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)),
+ totalSampleLoad
+ )
+ )
+
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false)
+ )
+
+ override val allocationPolicies = listOf(
+ "active-servers"
+ )
+}
+
+public class OperationalPhenomenaPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "operational_phenomena") {
+ override val topologies = listOf(
+ Topology("base")
+ )
+
+ override val workloads = listOf(
+ Workload("solvinity", 0.1),
+ Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true),
+ OperationalPhenomena(failureFrequency = 0.0, hasInterference = true),
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false),
+ OperationalPhenomena(failureFrequency = 0.0, hasInterference = false)
+ )
+
+ override val allocationPolicies = listOf(
+ "mem",
+ "mem-inv",
+ "core-mem",
+ "core-mem-inv",
+ "active-servers",
+ "active-servers-inv",
+ "random"
+ )
+}
+
+public class ReplayPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "replay") {
+ override val topologies = listOf(
+ Topology("base")
+ )
+
+ override val workloads = listOf(
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 0.0, hasInterference = false)
+ )
+
+ override val allocationPolicies = listOf(
+ "replay",
+ "active-servers"
+ )
+}
+
+public class TestPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "test") {
+ override val repetitions: Int = 1
+
+ override val topologies: List<Topology> = listOf(
+ Topology("base")
+ )
+
+ override val workloads: List<Workload> = listOf(
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomenas: List<OperationalPhenomena> = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicies: List<String> = listOf("active-servers")
+}
+
+public class MoreHpcPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_hpc") {
+ override val topologies = listOf(
+ Topology("base"),
+ Topology("exp-vol-hor-hom"),
+ Topology("exp-vol-ver-hom"),
+ Topology("exp-vel-ver-hom")
+ )
+
+ override val workloads = listOf(
+ Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC),
+ Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC),
+ Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC),
+ Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC),
+ Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD),
+ Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD),
+ Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD)
+ )
+
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicies = listOf(
+ "active-servers"
+ )
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
new file mode 100644
index 00000000..76a10e56
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
@@ -0,0 +1,161 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment
+
+import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy
+import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload
+import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor
+import com.atlarge.opendc.experiments.sc20.runner.TrialExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
+import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
+import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.TestCoroutineScope
+import mu.KotlinLogging
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import java.io.File
+import kotlin.random.Random
+
+/**
+ * The logger for the experiment scenario.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the
+ * same set of parameters.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() {
+ override suspend fun invoke(context: ExperimentExecutionContext) {
+ val experiment = parent.parent.parent
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
+ val seeder = Random(seed)
+ val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt"))
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = when (parent.allocationPolicy) {
+ "mem" -> AvailableMemoryAllocationPolicy()
+ "mem-inv" -> AvailableMemoryAllocationPolicy(true)
+ "core-mem" -> AvailableCoreMemoryAllocationPolicy()
+ "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
+ "active-servers" -> NumberOfActiveServersAllocationPolicy()
+ "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
+ "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
+ "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
+ "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
+ "replay" -> ReplayAllocationPolicy(experiment.vmPlacements)
+ else -> throw IllegalArgumentException("Unknown policy ${parent.allocationPolicy}")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ val rawTraceReaders =
+ context.cache.computeIfAbsent("raw-trace-readers") { mutableMapOf<String, Sc20RawParquetTraceReader>() } as MutableMap<String, Sc20RawParquetTraceReader>
+ val rawReaders = synchronized(rawTraceReaders) {
+ val workloadNames = if (parent.workload is CompositeWorkload) {
+ parent.workload.workloads.map { it.name }
+ } else {
+ listOf(parent.workload.name)
+ }
+
+ workloadNames.map { workloadName ->
+ rawTraceReaders.computeIfAbsent(workloadName) {
+ logger.info { "Loading trace $workloadName" }
+ Sc20RawParquetTraceReader(File(experiment.traces, workloadName))
+ }
+ }
+ }
+
+ val performanceInterferenceModel = experiment.performanceInterferenceModel
+ ?.takeIf { parent.operationalPhenomena.hasInterference }
+ ?.construct(seeder) ?: emptyMap()
+ val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, parent.workload, seed)
+
+ val monitor = ParquetExperimentMonitor(
+ parent.parent.parent.output,
+ "portfolio_id=${parent.parent.id}/scenario_id=${parent.id}/run_id=$id",
+ parent.parent.parent.bufferSize
+ )
+
+ testScope.launch {
+ val (bareMetalProvisioner, scheduler) = createProvisioner(
+ this,
+ clock,
+ environment,
+ allocationPolicy
+ )
+
+ val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) {
+ logger.debug("ENABLING failures")
+ createFailureDomain(
+ this,
+ clock,
+ seeder.nextInt(),
+ parent.operationalPhenomena.failureFrequency,
+ bareMetalProvisioner,
+ chan
+ )
+ } else {
+ null
+ }
+
+ attachMonitor(this, clock, scheduler, monitor)
+ processTrace(
+ this,
+ clock,
+ trace,
+ scheduler,
+ chan,
+ monitor,
+ experiment.vmPlacements
+ )
+
+ logger.debug("SUBMIT=${scheduler.submittedVms}")
+ logger.debug("FAIL=${scheduler.unscheduledVms}")
+ logger.debug("QUEUED=${scheduler.queuedVms}")
+ logger.debug("RUNNING=${scheduler.runningVms}")
+ logger.debug("FINISHED=${scheduler.finishedVms}")
+
+ failureDomain?.cancel()
+ scheduler.terminate()
+ }
+
+ try {
+ testScope.advanceUntilIdle()
+ } finally {
+ monitor.close()
+ }
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt
new file mode 100644
index 00000000..98bc7fc2
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt
@@ -0,0 +1,48 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment
+
+import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena
+import com.atlarge.opendc.experiments.sc20.experiment.model.Topology
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+
+/**
+ * A scenario represents a single point in the design space (a unique combination of parameters).
+ */
+public class Scenario(
+ override val parent: Portfolio,
+ val id: Int,
+ val repetitions: Int,
+ val topology: Topology,
+ val workload: Workload,
+ val allocationPolicy: String,
+ val operationalPhenomena: OperationalPhenomena
+) : ContainerExperimentDescriptor() {
+ override val children: Sequence<ExperimentDescriptor> = sequence {
+ repeat(repetitions) { i -> yield(Run(this@Scenario, i, i)) }
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt
new file mode 100644
index 00000000..af99df84
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt
@@ -0,0 +1,33 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment.model
+
+/**
+ * Operation phenomena during experiments.
+ *
+ * @param failureFrequency The average time between failures in hours.
+ * @param hasInterference A flag to enable performance interference between VMs.
+ */
+public data class OperationalPhenomena(val failureFrequency: Double, val hasInterference: Boolean)
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt
new file mode 100644
index 00000000..ea09688b
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt
@@ -0,0 +1,30 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment.model
+
+/**
+ * The topology topology on which we test the workload.
+ */
+public data class Topology(val name: String)
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt
new file mode 100644
index 00000000..d75ca6f9
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt
@@ -0,0 +1,42 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment.model
+
+enum class SamplingStrategy {
+ REGULAR,
+ HPC,
+ HPC_LOAD
+}
+
+/**
+ * A workload that is considered for a scenario.
+ */
+public open class Workload(open val name: String, val fraction: Double, val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR)
+
+/**
+ * A workload that is composed of multiple workloads.
+ */
+public class CompositeWorkload(override val name: String, val workloads: List<Workload>, val totalLoad: Double) :
+ Workload(name, -1.0)
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt
new file mode 100644
index 00000000..1f674f00
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt
@@ -0,0 +1,75 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment.monitor
+
+import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
+import java.io.Closeable
+
+/**
+ * A monitor watches the events of an experiment.
+ */
+interface ExperimentMonitor : Closeable {
+ /**
+ * This method is invoked when the state of a VM changes.
+ */
+ fun reportVmStateChange(time: Long, server: Server) {}
+
+ /**
+ * This method is invoked when the state of a host changes.
+ */
+ fun reportHostStateChange(
+ time: Long,
+ driver: VirtDriver,
+ server: Server
+ ) {}
+
+ /**
+ * Report the power consumption of a host.
+ */
+ fun reportPowerConsumption(host: Server, draw: Double) {}
+
+ /**
+ * This method is invoked for a host for each slice that is finishes.
+ */
+ fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ numberOfDeployedImages: Int,
+ hostServer: Server,
+ duration: Long = 5 * 60 * 1000L
+ ) {}
+
+ /**
+ * This method is invoked for a provisioner event.
+ */
+ fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {}
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
new file mode 100644
index 00000000..a06317cb
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
@@ -0,0 +1,204 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment.monitor
+
+import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
+import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent
+import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent
+import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter
+import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter
+import mu.KotlinLogging
+import java.io.File
+
+/**
+ * The logger instance to use.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * An [ExperimentMonitor] that logs the events to a Parquet file.
+ */
+class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor {
+ private val hostWriter = ParquetHostEventWriter(
+ File(base, "host-metrics/$partition/data.parquet"),
+ bufferSize
+ )
+ private val provisionerWriter = ParquetProvisionerEventWriter(
+ File(base, "provisioner-metrics/$partition/data.parquet"),
+ bufferSize
+ )
+ private val currentHostEvent = mutableMapOf<Server, HostEvent>()
+ private var startTime = -1L
+
+ override fun reportVmStateChange(time: Long, server: Server) {
+ if (startTime < 0) {
+ startTime = time
+
+ // Update timestamp of initial event
+ currentHostEvent.replaceAll { k, v -> v.copy(timestamp = startTime) }
+ }
+ }
+
+ override fun reportHostStateChange(
+ time: Long,
+ driver: VirtDriver,
+ server: Server
+ ) {
+ logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" }
+
+ val previousEvent = currentHostEvent[server]
+
+ val roundedTime = previousEvent?.let {
+ val duration = time - it.timestamp
+ val k = 5 * 60 * 1000L // 5 min in ms
+ val rem = duration % k
+
+ if (rem == 0L) {
+ time
+ } else {
+ it.timestamp + duration + k - rem
+ }
+ } ?: time
+
+ reportHostSlice(
+ roundedTime,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0.0,
+ 0.0,
+ 0,
+ server
+ )
+ }
+
+ private val lastPowerConsumption = mutableMapOf<Server, Double>()
+
+ override fun reportPowerConsumption(host: Server, draw: Double) {
+ lastPowerConsumption[host] = draw
+ }
+
+ override fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ numberOfDeployedImages: Int,
+ hostServer: Server,
+ duration: Long
+ ) {
+ val previousEvent = currentHostEvent[hostServer]
+ when {
+ previousEvent == null -> {
+ val event = HostEvent(
+ time,
+ 5 * 60 * 1000L,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0,
+ hostServer.flavor.cpuCount
+ )
+
+ currentHostEvent[hostServer] = event
+ }
+ previousEvent.timestamp == time -> {
+ val event = HostEvent(
+ time,
+ previousEvent.duration,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0,
+ hostServer.flavor.cpuCount
+ )
+
+ currentHostEvent[hostServer] = event
+ }
+ else -> {
+ hostWriter.write(previousEvent)
+
+ val event = HostEvent(
+ time,
+ time - previousEvent.timestamp,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0,
+ hostServer.flavor.cpuCount
+ )
+
+ currentHostEvent[hostServer] = event
+ }
+ }
+ }
+
+ override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {
+ provisionerWriter.write(
+ ProvisionerEvent(
+ time,
+ event.totalHostCount,
+ event.availableHostCount,
+ event.totalVmCount,
+ event.activeVmCount,
+ event.inactiveVmCount,
+ event.waitingVmCount,
+ event.failedVmCount
+ )
+ )
+ }
+
+ override fun close() {
+ // Flush remaining events
+ for ((_, event) in currentHostEvent) {
+ hostWriter.write(event)
+ }
+ currentHostEvent.clear()
+
+ hostWriter.close()
+ provisionerWriter.close()
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
new file mode 100644
index 00000000..4472def9
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
@@ -0,0 +1,89 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.reporter
+
+import com.atlarge.opendc.experiments.sc20.experiment.Run
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult
+import me.tongfei.progressbar.ProgressBar
+import me.tongfei.progressbar.ProgressBarBuilder
+import mu.KotlinLogging
+
+/**
+ * A reporter that reports the experiment progress to the console.
+ */
+public class ConsoleExperimentReporter : ExperimentExecutionListener, AutoCloseable {
+ /**
+ * The active [Run]s.
+ */
+ private val runs: MutableSet<Run> = mutableSetOf()
+
+ /**
+ * The total number of runs.
+ */
+ private var total = 0
+
+ /**
+ * The logger for this reporter.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The progress bar to keep track of the progress.
+ */
+ private val pb: ProgressBar = ProgressBarBuilder()
+ .setTaskName("")
+ .setInitialMax(1)
+ .build()
+
+ override fun descriptorRegistered(descriptor: ExperimentDescriptor) {
+ if (descriptor is Run) {
+ runs += descriptor
+ pb.maxHint((++total).toLong())
+ }
+ }
+
+ override fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) {
+ if (descriptor is Run) {
+ runs -= descriptor
+
+ pb.stepTo(total - runs.size.toLong())
+ if (runs.isEmpty()) {
+ pb.close()
+ }
+ }
+
+ if (result is ExperimentExecutionResult.Failed) {
+ logger.warn(result.throwable) { "Descriptor $descriptor failed" }
+ }
+ }
+
+ override fun executionStarted(descriptor: ExperimentDescriptor) {}
+
+ override fun close() {
+ pb.close()
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt
new file mode 100644
index 00000000..dac32586
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt
@@ -0,0 +1,68 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner
+
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.supervisorScope
+
+/**
+ * An abstract [ExperimentDescriptor] specifically for containers.
+ */
+public abstract class ContainerExperimentDescriptor : ExperimentDescriptor() {
+ /**
+ * The child descriptors of this container.
+ */
+ public abstract val children: Sequence<ExperimentDescriptor>
+
+ override val type: Type = Type.CONTAINER
+
+ override suspend fun invoke(context: ExperimentExecutionContext) {
+ val materializedChildren = children.toList()
+ for (child in materializedChildren) {
+ context.listener.descriptorRegistered(child)
+ }
+
+ supervisorScope {
+ for (child in materializedChildren) {
+ if (child.isTrial) {
+ launch {
+ val worker = context.scheduler.allocate()
+ context.listener.executionStarted(child)
+ try {
+ worker(child, context)
+ context.listener.executionFinished(child, ExperimentExecutionResult.Success)
+ } catch (e: Throwable) {
+ context.listener.executionFinished(child, ExperimentExecutionResult.Failed(e))
+ }
+ }
+ } else {
+ launch { child(context) }
+ }
+ }
+ }
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt
new file mode 100644
index 00000000..64b6b767
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt
@@ -0,0 +1,81 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner
+
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import java.io.Serializable
+
+/**
+ * An immutable description of an experiment in the **odcsim* simulation framework, which may be a single atomic trial
+ * or a composition of multiple trials.
+ *
+ * This class represents a dynamic tree-like structure where the children of the nodes are not known at instantiation
+ * since they might be generated dynamically.
+ */
+public abstract class ExperimentDescriptor : Serializable {
+ /**
+ * The parent of this descriptor, or `null` if it has no parent.
+ */
+ public abstract val parent: ExperimentDescriptor?
+
+ /**
+ * The type of descriptor.
+ */
+ abstract val type: Type
+
+ /**
+ * A flag to indicate that this descriptor is a root descriptor.
+ */
+ public open val isRoot: Boolean
+ get() = parent == null
+
+ /**
+ * A flag to indicate that this descriptor describes an experiment trial.
+ */
+ val isTrial: Boolean
+ get() = type == Type.TRIAL
+
+ /**
+ * Execute this [ExperimentDescriptor].
+ *
+ * @param context The context to execute the descriptor in.
+ */
+ public abstract suspend operator fun invoke(context: ExperimentExecutionContext)
+
+ /**
+ * The types of experiment descriptors.
+ */
+ enum class Type {
+ /**
+ * A composition of multiple experiment descriptions whose invocation happens on a single thread.
+ */
+ CONTAINER,
+
+ /**
+ * An invocation of a single scenario of an experiment whose invocation may happen on different threads.
+ */
+ TRIAL
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt
new file mode 100644
index 00000000..77f970fe
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt
@@ -0,0 +1,51 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner
+
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
+
+/**
+ * An [ExperimentRunner] facilitates discovery and execution of experiments.
+ */
+public interface ExperimentRunner {
+ /**
+ * The unique identifier of this runner.
+ */
+ val id: String
+
+ /**
+ * The version of this runner.
+ */
+ val version: String?
+ get() = null
+
+ /**
+ * Execute the specified experiment represented as [ExperimentDescriptor].
+ *
+ * @param root The experiment to execute.
+ * @param listener The listener to report events to.
+ */
+ public fun execute(root: ExperimentDescriptor, listener: ExperimentExecutionListener)
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt
new file mode 100644
index 00000000..cf05416a
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt
@@ -0,0 +1,32 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner
+
+/**
+ * An abstract [ExperimentDescriptor] specifically for trials.
+ */
+public abstract class TrialExperimentDescriptor : ExperimentDescriptor() {
+ override val type: Type = Type.TRIAL
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt
new file mode 100644
index 00000000..9a04c491
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt
@@ -0,0 +1,45 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.execution
+
+/**
+ * The execution context of an experiment.
+ */
+public interface ExperimentExecutionContext {
+ /**
+ * The execution listener to use.
+ */
+ public val listener: ExperimentExecutionListener
+
+ /**
+ * The experiment scheduler to use.
+ */
+ public val scheduler: ExperimentScheduler
+
+ /**
+ * A cache for objects within a single runner.
+ */
+ public val cache: MutableMap<Any?, Any?>
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt
new file mode 100644
index 00000000..f6df0524
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt
@@ -0,0 +1,48 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.execution
+
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+
+/**
+ * Listener to be notified of experiment execution events by experiment runners.
+ */
+interface ExperimentExecutionListener {
+ /**
+ * A method that is invoked when a new [ExperimentDescriptor] is registered.
+ */
+ fun descriptorRegistered(descriptor: ExperimentDescriptor)
+
+ /**
+ * A method that is invoked when when the execution of a leaf or subtree of the experiment tree has finished,
+ * regardless of the outcome.
+ */
+ fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult)
+
+ /**
+ * A method that is invoked when the execution of a leaf or subtree of the experiment tree is about to be started.
+ */
+ fun executionStarted(descriptor: ExperimentDescriptor)
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt
new file mode 100644
index 00000000..057e1f92
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt
@@ -0,0 +1,42 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.execution
+
+import java.io.Serializable
+
+/**
+ * The result of executing an experiment.
+ */
+public sealed class ExperimentExecutionResult : Serializable {
+ /**
+ * The experiment executed successfully
+ */
+ public object Success : ExperimentExecutionResult()
+
+ /**
+ * The experiment failed during execution.
+ */
+ public data class Failed(val throwable: Throwable) : ExperimentExecutionResult()
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt
new file mode 100644
index 00000000..96678abf
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt
@@ -0,0 +1,58 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.execution
+
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import java.io.Closeable
+
+/**
+ * A interface for scheduling the execution of experiment trials over compute resources (threads/containers/vms)
+ */
+interface ExperimentScheduler : Closeable {
+ /**
+ * Allocate a [Worker] for executing an experiment trial. This method may suspend in case no resources are directly
+ * available at the moment.
+ *
+ * @return The available worker.
+ */
+ suspend fun allocate(): ExperimentScheduler.Worker
+
+ /**
+ * An isolated worker of an [ExperimentScheduler] that is responsible for executing a single experiment trial.
+ */
+ interface Worker {
+ /**
+ * Dispatch the specified [ExperimentDescriptor] to execute some time in the future and return the results of
+ * the trial.
+ *
+ * @param descriptor The descriptor to execute.
+ * @param context The context to execute the descriptor in.
+ */
+ suspend operator fun invoke(
+ descriptor: ExperimentDescriptor,
+ context: ExperimentExecutionContext
+ )
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
new file mode 100644
index 00000000..ddd64560
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
@@ -0,0 +1,82 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.execution
+
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import kotlinx.coroutines.asCoroutineDispatcher
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.supervisorScope
+import kotlinx.coroutines.sync.Semaphore
+import kotlinx.coroutines.withContext
+import java.util.concurrent.Executors
+
+/**
+ * An [ExperimentScheduler] that runs experiments using a local thread pool.
+ *
+ * @param parallelism The maximum amount of parallel workers (default is the number of available processors).
+ */
+class ThreadPoolExperimentScheduler(parallelism: Int = Runtime.getRuntime().availableProcessors() + 1) : ExperimentScheduler {
+ private val dispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher()
+ private val tickets = Semaphore(parallelism)
+
+ override suspend fun allocate(): ExperimentScheduler.Worker {
+ tickets.acquire()
+ return object : ExperimentScheduler.Worker {
+ override suspend fun invoke(
+ descriptor: ExperimentDescriptor,
+ context: ExperimentExecutionContext
+ ) = supervisorScope {
+ val listener =
+ object : ExperimentExecutionListener {
+ override fun descriptorRegistered(descriptor: ExperimentDescriptor) {
+ launch { context.listener.descriptorRegistered(descriptor) }
+ }
+
+ override fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) {
+ launch { context.listener.executionFinished(descriptor, result) }
+ }
+
+ override fun executionStarted(descriptor: ExperimentDescriptor) {
+ launch { context.listener.executionStarted(descriptor) }
+ }
+ }
+
+ val newContext = object : ExperimentExecutionContext by context {
+ override val listener: ExperimentExecutionListener = listener
+ }
+
+ try {
+ withContext(dispatcher) {
+ descriptor(newContext)
+ }
+ } finally {
+ tickets.release()
+ }
+ }
+ }
+ }
+
+ override fun close() = dispatcher.close()
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
new file mode 100644
index 00000000..3b80276f
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
@@ -0,0 +1,62 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.internal
+
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentRunner
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentScheduler
+import kotlinx.coroutines.runBlocking
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * The default implementation of the [ExperimentRunner] interface.
+ *
+ * @param scheduler The scheduler to use.
+ */
+public class DefaultExperimentRunner(val scheduler: ExperimentScheduler) : ExperimentRunner {
+ override val id: String = "default"
+
+ override val version: String? = "1.0"
+
+ override fun execute(root: ExperimentDescriptor, listener: ExperimentExecutionListener) = runBlocking {
+ val context = object : ExperimentExecutionContext {
+ override val listener: ExperimentExecutionListener = listener
+ override val scheduler: ExperimentScheduler = this@DefaultExperimentRunner.scheduler
+ override val cache: MutableMap<Any?, Any?> = ConcurrentHashMap()
+ }
+
+ listener.descriptorRegistered(root)
+ context.listener.executionStarted(root)
+ try {
+ root(context)
+ context.listener.executionFinished(root, ExperimentExecutionResult.Success)
+ } catch (e: Throwable) {
+ context.listener.executionFinished(root, ExperimentExecutionResult.Failed(e))
+ }
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt
new file mode 100644
index 00000000..c1e14e2a
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt
@@ -0,0 +1,35 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry
+
+/**
+ * An event that occurs within the system.
+ */
+public abstract class Event(val name: String) {
+ /**
+ * The time of occurrence of this event.
+ */
+ public abstract val timestamp: Long
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt
new file mode 100644
index 00000000..b9030172
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt
@@ -0,0 +1,45 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry
+
+import com.atlarge.opendc.compute.core.Server
+
+/**
+ * A periodic report of the host machine metrics.
+ */
+data class HostEvent(
+ override val timestamp: Long,
+ val duration: Long,
+ val host: Server,
+ val vmCount: Int,
+ val requestedBurst: Long,
+ val grantedBurst: Long,
+ val overcommissionedBurst: Long,
+ val interferedBurst: Long,
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val powerDraw: Double,
+ val cores: Int
+) : Event("host-metrics")
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt
new file mode 100644
index 00000000..df619632
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt
@@ -0,0 +1,39 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry
+
+/**
+ * A periodic report of the provisioner's metrics.
+ */
+data class ProvisionerEvent(
+ override val timestamp: Long,
+ val totalHostCount: Int,
+ val availableHostCount: Int,
+ val totalVmCount: Int,
+ val activeVmCount: Int,
+ val inactiveVmCount: Int,
+ val waitingVmCount: Int,
+ val failedVmCount: Int
+) : Event("provisioner-metrics")
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt
new file mode 100644
index 00000000..497d2c3f
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt
@@ -0,0 +1,35 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry
+
+import com.atlarge.opendc.experiments.sc20.experiment.Run
+
+/**
+ * A periodic report of the host machine metrics.
+ */
+data class RunEvent(
+ val run: Run,
+ override val timestamp: Long
+) : Event("run")
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt
new file mode 100644
index 00000000..7289fb21
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt
@@ -0,0 +1,43 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry
+
+import com.atlarge.opendc.compute.core.Server
+
+/**
+ * A periodic report of a virtual machine's metrics.
+ */
+data class VmEvent(
+ override val timestamp: Long,
+ val duration: Long,
+ val vm: Server,
+ val host: Server,
+ val requestedBurst: Long,
+ val grantedBurst: Long,
+ val overcommissionedBurst: Long,
+ val interferedBurst: Long,
+ val cpuUsage: Double,
+ val cpuDemand: Double
+) : Event("vm-metrics")
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
new file mode 100644
index 00000000..0a310027
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
@@ -0,0 +1,128 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry.parquet
+
+import com.atlarge.opendc.experiments.sc20.telemetry.Event
+import mu.KotlinLogging
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.io.Closeable
+import java.io.File
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import kotlin.concurrent.thread
+
+/**
+ * The logging instance to use.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * A writer that writes events in Parquet format.
+ */
+public open class ParquetEventWriter<in T : Event>(
+ private val path: File,
+ private val schema: Schema,
+ private val converter: (T, GenericData.Record) -> Unit,
+ private val bufferSize: Int = 4096
+) : Runnable, Closeable {
+ /**
+ * The writer to write the Parquet file.
+ */
+ private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ /**
+ * The queue of commands to process.
+ */
+ private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize)
+
+ /**
+ * The thread that is responsible for writing the Parquet records.
+ */
+ private val writerThread = thread(start = false, name = "parquet-writer") { run() }
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ public fun write(event: T) {
+ queue.put(Action.Write(event))
+ }
+
+ /**
+ * Signal the writer to stop.
+ */
+ public override fun close() {
+ queue.put(Action.Stop)
+ writerThread.join()
+ }
+
+ init {
+ writerThread.start()
+ }
+
+ /**
+ * Start the writer thread.
+ */
+ override fun run() {
+ try {
+ loop@ while (true) {
+ val action = queue.take()
+ when (action) {
+ is Action.Stop -> break@loop
+ is Action.Write<*> -> {
+ val record = GenericData.Record(schema)
+ @Suppress("UNCHECKED_CAST")
+ converter(action.event as T, record)
+ writer.write(record)
+ }
+ }
+ }
+ } catch (e: Throwable) {
+ logger.error("Writer failed", e)
+ } finally {
+ writer.close()
+ }
+ }
+
+ sealed class Action {
+ /**
+ * A poison pill that will stop the writer thread.
+ */
+ object Stop : Action()
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ data class Write<out T : Event>(val event: T) : Action()
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
new file mode 100644
index 00000000..3bc09435
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
@@ -0,0 +1,83 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry.parquet
+
+import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import java.io.File
+
+/**
+ * A Parquet event writer for [HostEvent]s.
+ */
+public class ParquetHostEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "host-writer"
+
+ companion object {
+ val convert: (HostEvent, GenericData.Record) -> Unit = { event, record ->
+ // record.put("portfolio_id", event.run.parent.parent.id)
+ // record.put("scenario_id", event.run.parent.id)
+ // record.put("run_id", event.run.id)
+ record.put("host_id", event.host.name)
+ record.put("state", event.host.state.name)
+ record.put("timestamp", event.timestamp)
+ record.put("duration", event.duration)
+ record.put("vm_count", event.vmCount)
+ record.put("requested_burst", event.requestedBurst)
+ record.put("granted_burst", event.grantedBurst)
+ record.put("overcommissioned_burst", event.overcommissionedBurst)
+ record.put("interfered_burst", event.interferedBurst)
+ record.put("cpu_usage", event.cpuUsage)
+ record.put("cpu_demand", event.cpuDemand)
+ record.put("power_draw", event.powerDraw * (1.0 / 12))
+ record.put("cores", event.cores)
+ }
+
+ val schema: Schema = SchemaBuilder
+ .record("host_metrics")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ // .name("portfolio_id").type().intType().noDefault()
+ // .name("scenario_id").type().intType().noDefault()
+ // .name("run_id").type().intType().noDefault()
+ .name("timestamp").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("host_id").type().stringType().noDefault()
+ .name("state").type().stringType().noDefault()
+ .name("vm_count").type().intType().noDefault()
+ .name("requested_burst").type().longType().noDefault()
+ .name("granted_burst").type().longType().noDefault()
+ .name("overcommissioned_burst").type().longType().noDefault()
+ .name("interfered_burst").type().longType().noDefault()
+ .name("cpu_usage").type().doubleType().noDefault()
+ .name("cpu_demand").type().doubleType().noDefault()
+ .name("power_draw").type().doubleType().noDefault()
+ .name("cores").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
new file mode 100644
index 00000000..1f3b0472
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
@@ -0,0 +1,67 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry.parquet
+
+import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import java.io.File
+
+/**
+ * A Parquet event writer for [ProvisionerEvent]s.
+ */
+public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "provisioner-writer"
+
+ companion object {
+ val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record ->
+ record.put("timestamp", event.timestamp)
+ record.put("host_total_count", event.totalHostCount)
+ record.put("host_available_count", event.availableHostCount)
+ record.put("vm_total_count", event.totalVmCount)
+ record.put("vm_active_count", event.activeVmCount)
+ record.put("vm_inactive_count", event.inactiveVmCount)
+ record.put("vm_waiting_count", event.waitingVmCount)
+ record.put("vm_failed_count", event.failedVmCount)
+ }
+
+ val schema: Schema = SchemaBuilder
+ .record("provisioner_metrics")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ .name("timestamp").type().longType().noDefault()
+ .name("host_total_count").type().intType().noDefault()
+ .name("host_available_count").type().intType().noDefault()
+ .name("vm_total_count").type().intType().noDefault()
+ .name("vm_active_count").type().intType().noDefault()
+ .name("vm_inactive_count").type().intType().noDefault()
+ .name("vm_waiting_count").type().intType().noDefault()
+ .name("vm_failed_count").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
new file mode 100644
index 00000000..98afe3b8
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
@@ -0,0 +1,80 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry.parquet
+
+import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import java.io.File
+
+/**
+ * A Parquet event writer for [RunEvent]s.
+ */
+public class ParquetRunEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "run-writer"
+
+ companion object {
+ val convert: (RunEvent, GenericData.Record) -> Unit = { event, record ->
+ val run = event.run
+ val scenario = run.parent
+ val portfolio = scenario.parent
+ record.put("portfolio_id", portfolio.id)
+ record.put("portfolio_name", portfolio.name)
+ record.put("scenario_id", scenario.id)
+ record.put("run_id", run.id)
+ record.put("repetitions", scenario.repetitions)
+ record.put("topology", scenario.topology.name)
+ record.put("workload_name", scenario.workload.name)
+ record.put("workload_fraction", scenario.workload.fraction)
+ record.put("workload_sampler", scenario.workload.samplingStrategy)
+ record.put("allocation_policy", scenario.allocationPolicy)
+ record.put("failure_frequency", scenario.operationalPhenomena.failureFrequency)
+ record.put("interference", scenario.operationalPhenomena.hasInterference)
+ record.put("seed", run.seed)
+ }
+
+ val schema: Schema = SchemaBuilder
+ .record("runs")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ .name("portfolio_id").type().intType().noDefault()
+ .name("portfolio_name").type().stringType().noDefault()
+ .name("scenario_id").type().intType().noDefault()
+ .name("run_id").type().intType().noDefault()
+ .name("repetitions").type().intType().noDefault()
+ .name("topology").type().stringType().noDefault()
+ .name("workload_name").type().stringType().noDefault()
+ .name("workload_fraction").type().doubleType().noDefault()
+ .name("workload_sampler").type().stringType().noDefault()
+ .name("allocation_policy").type().stringType().noDefault()
+ .name("failure_frequency").type().doubleType().noDefault()
+ .name("interference").type().booleanType().noDefault()
+ .name("seed").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
new file mode 100644
index 00000000..06bececf
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
@@ -0,0 +1,98 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.trace
+
+import com.atlarge.opendc.compute.core.image.VmImage
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.format.trace.TraceEntry
+import com.atlarge.opendc.format.trace.TraceReader
+import java.util.TreeSet
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param reader The internal trace reader to use.
+ * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ * @param run The run to which this reader belongs.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+class Sc20ParquetTraceReader(
+ rawReaders: List<Sc20RawParquetTraceReader>,
+ performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
+ workload: Workload,
+ seed: Int
+) : TraceReader<VmWorkload> {
+ /**
+ * The iterator over the actual trace.
+ */
+ private val iterator: Iterator<TraceEntry<VmWorkload>> =
+ rawReaders
+ .map { it.read() }
+ .run {
+ if (workload is CompositeWorkload) {
+ this.zip(workload.workloads)
+ } else {
+ this.zip(listOf(workload))
+ }
+ }
+ .map { sampleWorkload(it.first, workload, it.second, seed) }
+ .flatten()
+ .run {
+ // Apply performance interference model
+ if (performanceInterferenceModel.isEmpty())
+ this
+ else {
+ map { entry ->
+ val image = entry.workload.image
+ val id = image.name
+ val relevantPerformanceInterferenceModelItems =
+ performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
+
+ val newImage =
+ VmImage(
+ image.uid,
+ image.name,
+ mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
+ image.flopsHistory,
+ image.maxCores,
+ image.requiredMemory
+ )
+ val newWorkload = entry.workload.copy(image = newImage)
+ Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload)
+ }
+ }
+ }
+ .iterator()
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<VmWorkload> = iterator.next()
+
+ override fun close() {}
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
new file mode 100644
index 00000000..f1c1dc25
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
@@ -0,0 +1,174 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.trace
+
+import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
+import com.atlarge.opendc.compute.core.image.VmImage
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.core.User
+import com.atlarge.opendc.format.trace.TraceEntry
+import com.atlarge.opendc.format.trace.TraceReader
+import mu.KotlinLogging
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetReader
+import java.io.File
+import java.util.UUID
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param path The directory of the traces.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+class Sc20RawParquetTraceReader(private val path: File) {
+ /**
+ * Read the fragments into memory.
+ */
+ private fun parseFragments(path: File): Map<String, List<FlopsHistoryFragment>> {
+ val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet"))
+ .disableCompatibility()
+ .build()
+
+ val fragments = mutableMapOf<String, MutableList<FlopsHistoryFragment>>()
+
+ return try {
+ while (true) {
+ val record = reader.read() ?: break
+
+ val id = record["id"].toString()
+ val tick = record["time"] as Long
+ val duration = record["duration"] as Long
+ val cores = record["cores"] as Int
+ val cpuUsage = record["cpuUsage"] as Double
+ val flops = record["flops"] as Long
+
+ val fragment = FlopsHistoryFragment(
+ tick,
+ flops,
+ duration,
+ cpuUsage,
+ cores
+ )
+
+ fragments.getOrPut(id) { mutableListOf() }.add(fragment)
+ }
+
+ fragments
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Read the metadata into a workload.
+ */
+ private fun parseMeta(path: File, fragments: Map<String, List<FlopsHistoryFragment>>): List<TraceEntryImpl> {
+ val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet"))
+ .disableCompatibility()
+ .build()
+
+ var counter = 0
+ val entries = mutableListOf<TraceEntryImpl>()
+
+ return try {
+ while (true) {
+ val record = metaReader.read() ?: break
+
+ val id = record["id"].toString()
+ if (!fragments.containsKey(id)) {
+ continue
+ }
+
+ val submissionTime = record["submissionTime"] as Long
+ val endTime = record["endTime"] as Long
+ val maxCores = record["maxCores"] as Int
+ val requiredMemory = record["requiredMemory"] as Long
+ val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
+
+ val vmFragments = fragments.getValue(id).asSequence()
+ val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
+ val vmWorkload = VmWorkload(
+ uid,
+ id,
+ UnnamedUser,
+ VmImage(
+ uid,
+ id,
+ mapOf(
+ "submit-time" to submissionTime,
+ "end-time" to endTime,
+ "total-load" to totalLoad
+ ),
+ vmFragments,
+ maxCores,
+ requiredMemory
+ )
+ )
+ entries.add(TraceEntryImpl(submissionTime, vmWorkload))
+ }
+
+ entries
+ } catch (e: Exception) {
+ e.printStackTrace()
+ throw e
+ } finally {
+ metaReader.close()
+ }
+ }
+
+ /**
+ * The entries in the trace.
+ */
+ private val entries: List<TraceEntryImpl>
+
+ init {
+ val fragments = parseFragments(path)
+ entries = parseMeta(path, fragments)
+ }
+
+ /**
+ * Read the entries in the trace.
+ */
+ public fun read(): List<TraceEntry<VmWorkload>> = entries
+
+ /**
+ * An unnamed user.
+ */
+ private object UnnamedUser : User {
+ override val name: String = "<unnamed>"
+ override val uid: UUID = UUID.randomUUID()
+ }
+
+ /**
+ * An entry in the trace.
+ */
+ internal data class TraceEntryImpl(
+ override var submissionTime: Long,
+ override val workload: VmWorkload
+ ) : TraceEntry<VmWorkload>
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
new file mode 100644
index 00000000..9fa33c3f
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
@@ -0,0 +1,305 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.trace
+
+import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
+import com.atlarge.opendc.compute.core.image.VmImage
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.core.User
+import com.atlarge.opendc.format.trace.TraceEntry
+import com.atlarge.opendc.format.trace.TraceReader
+import mu.KotlinLogging
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetReader
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.filter2.predicate.Statistics
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate
+import org.apache.parquet.io.api.Binary
+import java.io.File
+import java.io.Serializable
+import java.util.SortedSet
+import java.util.TreeSet
+import java.util.UUID
+import java.util.concurrent.ArrayBlockingQueue
+import kotlin.concurrent.thread
+import kotlin.random.Random
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
+ *
+ * @param traceFile The directory of the traces.
+ * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+class Sc20StreamingParquetTraceReader(
+ traceFile: File,
+ performanceInterferenceModel: PerformanceInterferenceModel,
+ selectedVms: List<String>,
+ random: Random
+) : TraceReader<VmWorkload> {
+ /**
+ * The internal iterator to use for this reader.
+ */
+ private val iterator: Iterator<TraceEntry<VmWorkload>>
+
+ /**
+ * The intermediate buffer to store the read records in.
+ */
+ private val queue = ArrayBlockingQueue<Pair<String, FlopsHistoryFragment>>(1024)
+
+ /**
+ * An optional filter for filtering the selected VMs
+ */
+ private val filter =
+ if (selectedVms.isEmpty())
+ null
+ else
+ FilterCompat.get(
+ FilterApi.userDefined(
+ FilterApi.binaryColumn("id"),
+ SelectedVmFilter(
+ TreeSet(selectedVms)
+ )
+ )
+ )
+
+ /**
+ * A poisonous fragment.
+ */
+ private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0))
+
+ /**
+ * The thread to read the records in.
+ */
+ private val readerThread = thread(start = true, name = "sc20-reader") {
+ val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
+ .disableCompatibility()
+ .run { if (filter != null) withFilter(filter) else this }
+ .build()
+
+ try {
+ while (true) {
+ val record = reader.read()
+
+ if (record == null) {
+ queue.put(poison)
+ break
+ }
+
+ val id = record["id"].toString()
+ val tick = record["time"] as Long
+ val duration = record["duration"] as Long
+ val cores = record["cores"] as Int
+ val cpuUsage = record["cpuUsage"] as Double
+ val flops = record["flops"] as Long
+
+ val fragment = FlopsHistoryFragment(
+ tick,
+ flops,
+ duration,
+ cpuUsage,
+ cores
+ )
+
+ queue.put(id to fragment)
+ }
+ } catch (e: InterruptedException) {
+ // Do not rethrow this
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Fill the buffers with the VMs
+ */
+ private fun pull(buffers: Map<String, List<MutableList<FlopsHistoryFragment>>>) {
+ if (!hasNext) {
+ return
+ }
+
+ val fragments = mutableListOf<Pair<String, FlopsHistoryFragment>>()
+ queue.drainTo(fragments)
+
+ for ((id, fragment) in fragments) {
+ if (id == poison.first) {
+ hasNext = false
+ return
+ }
+ buffers[id]?.forEach { it.add(fragment) }
+ }
+ }
+
+ /**
+ * A flag to indicate whether the reader has more entries.
+ */
+ private var hasNext: Boolean = true
+
+ /**
+ * Initialize the reader.
+ */
+ init {
+ val takenIds = mutableSetOf<UUID>()
+ val entries = mutableMapOf<String, GenericData.Record>()
+ val buffers = mutableMapOf<String, MutableList<MutableList<FlopsHistoryFragment>>>()
+
+ val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
+ .disableCompatibility()
+ .run { if (filter != null) withFilter(filter) else this }
+ .build()
+
+ while (true) {
+ val record = metaReader.read() ?: break
+ val id = record["id"].toString()
+ entries[id] = record
+ }
+
+ metaReader.close()
+
+ val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms
+
+ // Create the entry iterator
+ iterator = selection.asSequence()
+ .mapNotNull { entries[it] }
+ .mapIndexed { index, record ->
+ val id = record["id"].toString()
+ val submissionTime = record["submissionTime"] as Long
+ val endTime = record["endTime"] as Long
+ val maxCores = record["maxCores"] as Int
+ val requiredMemory = record["requiredMemory"] as Long
+ val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray())
+
+ assert(uid !in takenIds)
+ takenIds += uid
+
+ logger.info("Processing VM $id")
+
+ val internalBuffer = mutableListOf<FlopsHistoryFragment>()
+ val externalBuffer = mutableListOf<FlopsHistoryFragment>()
+ buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
+ val fragments = sequence<FlopsHistoryFragment> {
+ repeat@while (true) {
+ if (externalBuffer.isEmpty()) {
+ if (hasNext) {
+ pull(buffers)
+ continue
+ } else {
+ break
+ }
+ }
+
+ internalBuffer.addAll(externalBuffer)
+ externalBuffer.clear()
+
+ for (fragment in internalBuffer) {
+ yield(fragment)
+
+ if (fragment.tick >= endTime) {
+ break@repeat
+ }
+ }
+
+ internalBuffer.clear()
+ }
+
+ buffers.remove(id)
+ }
+ val relevantPerformanceInterferenceModelItems =
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(),
+ Random(random.nextInt())
+ )
+ val vmWorkload = VmWorkload(
+ uid,
+ "VM Workload $id",
+ UnnamedUser,
+ VmImage(
+ uid,
+ id,
+ mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
+ fragments,
+ maxCores,
+ requiredMemory
+ )
+ )
+
+ TraceEntryImpl(
+ submissionTime,
+ vmWorkload
+ )
+ }
+ .sortedBy { it.submissionTime }
+ .toList()
+ .iterator()
+ }
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<VmWorkload> = iterator.next()
+
+ override fun close() {
+ readerThread.interrupt()
+ }
+
+ private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
+ override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
+
+ override fun canDrop(statistics: Statistics<Binary>): Boolean {
+ val min = statistics.min
+ val max = statistics.max
+
+ return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty()
+ }
+
+ override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean {
+ val min = statistics.min
+ val max = statistics.max
+
+ return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
+ }
+ }
+
+ /**
+ * An unnamed user.
+ */
+ private object UnnamedUser : User {
+ override val name: String = "<unnamed>"
+ override val uid: UUID = UUID.randomUUID()
+ }
+
+ /**
+ * An entry in the trace.
+ */
+ private data class TraceEntryImpl(
+ override var submissionTime: Long,
+ override val workload: VmWorkload
+ ) : TraceEntry<VmWorkload>
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
new file mode 100644
index 00000000..a2ce3109
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -0,0 +1,621 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.trace
+
+import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.arguments.argument
+import com.github.ajalt.clikt.parameters.groups.OptionGroup
+import com.github.ajalt.clikt.parameters.groups.groupChoice
+import com.github.ajalt.clikt.parameters.options.convert
+import com.github.ajalt.clikt.parameters.options.default
+import com.github.ajalt.clikt.parameters.options.defaultLazy
+import com.github.ajalt.clikt.parameters.options.option
+import com.github.ajalt.clikt.parameters.options.required
+import com.github.ajalt.clikt.parameters.options.split
+import com.github.ajalt.clikt.parameters.types.file
+import com.github.ajalt.clikt.parameters.types.long
+import me.tongfei.progressbar.ProgressBar
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import java.util.Random
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * Represents the command for converting traces
+ */
+class TraceConverterCli : CliktCommand(name = "trace-converter") {
+ /**
+ * The directory where the trace should be stored.
+ */
+ private val outputPath by option("-O", "--output", help = "path to store the trace")
+ .file(canBeFile = false, mustExist = false)
+ .defaultLazy { File("output") }
+
+ /**
+ * The directory where the input trace is located.
+ */
+ private val inputPath by argument("input", help = "path to the input trace")
+ .file(canBeFile = false)
+
+ /**
+ * The input type of the trace.
+ */
+ val type by option("-t", "--type", help = "input type of trace").groupChoice(
+ "solvinity" to SolvinityConversion(),
+ "bitbrains" to BitbrainsConversion(),
+ "azure" to AzureConversion()
+ )
+
+ override fun run() {
+ val metaSchema = SchemaBuilder
+ .record("meta")
+ .namespace("com.atlarge.opendc.format.sc20")
+ .fields()
+ .name("id").type().stringType().noDefault()
+ .name("submissionTime").type().longType().noDefault()
+ .name("endTime").type().longType().noDefault()
+ .name("maxCores").type().intType().noDefault()
+ .name("requiredMemory").type().longType().noDefault()
+ .endRecord()
+ val schema = SchemaBuilder
+ .record("trace")
+ .namespace("com.atlarge.opendc.format.sc20")
+ .fields()
+ .name("id").type().stringType().noDefault()
+ .name("time").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("cores").type().intType().noDefault()
+ .name("cpuUsage").type().doubleType().noDefault()
+ .name("flops").type().longType().noDefault()
+ .endRecord()
+
+ val metaParquet = File(outputPath, "meta.parquet")
+ val traceParquet = File(outputPath, "trace.parquet")
+
+ if (metaParquet.exists()) {
+ metaParquet.delete()
+ }
+ if (traceParquet.exists()) {
+ traceParquet.delete()
+ }
+
+ val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(metaParquet.toURI()))
+ .withSchema(metaSchema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ val writer = AvroParquetWriter.builder<GenericData.Record>(Path(traceParquet.toURI()))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ try {
+ val type = type ?: throw IllegalArgumentException("Invalid trace conversion")
+ val allFragments = type.read(inputPath, metaSchema, metaWriter)
+ allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
+
+ for (fragment in allFragments) {
+ val record = GenericData.Record(schema)
+ record.put("id", fragment.id)
+ record.put("time", fragment.tick)
+ record.put("duration", fragment.duration)
+ record.put("cores", fragment.cores)
+ record.put("cpuUsage", fragment.usage)
+ record.put("flops", fragment.flops)
+
+ writer.write(record)
+ }
+ } finally {
+ writer.close()
+ metaWriter.close()
+ }
+ }
+}
+
+/**
+ * The supported trace conversions.
+ */
+sealed class TraceConversion(name: String) : OptionGroup(name) {
+ /**
+ * Read the fragments of the trace.
+ */
+ abstract fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment>
+}
+
+class SolvinityConversion : TraceConversion("Solvinity") {
+ val clusters by option()
+ .split(",")
+
+ val vmPlacements by option("--vm-placements", help = "file containing the VM placements")
+ .file(canBeDir = false)
+ .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } }
+ .required()
+
+ override fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment> {
+ val clusters = clusters?.toSet() ?: emptySet()
+ val timestampCol = 0
+ val cpuUsageCol = 1
+ val coreCol = 12
+ val provisionedMemoryCol = 20
+ val traceInterval = 5 * 60 * 1000L
+
+ // Identify start time of the entire trace
+ var minTimestamp = Long.MAX_VALUE
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ .forEach { vmFile ->
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEachIndexed { idx, lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val vmId = vmFile.name
+
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null || !clusters.contains(clusterName)) {
+ continue
+ }
+
+ val values = line.split("\t")
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+
+ if (timestamp < minTimestamp) {
+ minTimestamp = timestamp
+ }
+ return@forEach
+ }
+ }
+ }
+ }
+
+ println("Start of trace at $minTimestamp")
+
+ val allFragments = mutableListOf<Fragment>()
+
+ val begin = 15 * 24 * 60 * 60 * 1000L
+ val end = 45 * 24 * 60 * 60 * 1000L
+
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ .forEachIndexed { idx, vmFile ->
+ println(vmFile)
+
+ var vmId = ""
+ var maxCores = -1
+ var requiredMemory = -1L
+ var cores = -1
+ var minTime = Long.MAX_VALUE
+
+ val flopsFragments = sequence {
+ var last: Fragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split("\t")
+
+ vmId = vmFile.name
+
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null || !clusters.contains(clusterName)) {
+ continue
+ }
+
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp
+ if (begin > timestamp || timestamp > end) {
+ continue
+ }
+
+ cores = values[coreCol].trim().toInt()
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
+ Fragment(
+ vmId,
+ oldFragment.tick,
+ oldFragment.flops + flops,
+ oldFragment.duration + traceInterval,
+ cpuUsage,
+ cores
+ )
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ cores
+ )
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
+ }
+ }
+ }
+ }
+
+ if (last != null) {
+ yield(last!!)
+ }
+ }
+
+ var maxTime = Long.MIN_VALUE
+ flopsFragments.filter { it.tick in begin until end }.forEach { fragment ->
+ allFragments.add(fragment)
+ maxTime = max(maxTime, fragment.tick)
+ }
+
+ if (minTime in begin until end) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", vmId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+ }
+
+ return allFragments
+ }
+}
+
+/**
+ * Conversion of the Bitbrains public trace.
+ */
+class BitbrainsConversion : TraceConversion("Bitbrains") {
+ override fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment> {
+ val timestampCol = 0
+ val cpuUsageCol = 3
+ val coreCol = 1
+ val provisionedMemoryCol = 5
+ val traceInterval = 5 * 60 * 1000L
+
+ val allFragments = mutableListOf<Fragment>()
+
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ .forEachIndexed { idx, vmFile ->
+ println(vmFile)
+
+ var vmId = ""
+ var maxCores = -1
+ var requiredMemory = -1L
+ var cores = -1
+ var minTime = Long.MAX_VALUE
+
+ val flopsFragments = sequence {
+ var last: Fragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .drop(1)
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(";\t")
+
+ vmId = vmFile.name
+
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+
+ cores = values[coreCol].trim().toInt()
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toDouble().toLong())
+ maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
+ Fragment(
+ vmId,
+ oldFragment.tick,
+ oldFragment.flops + flops,
+ oldFragment.duration + traceInterval,
+ cpuUsage,
+ cores
+ )
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ cores
+ )
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
+ }
+ }
+ }
+ }
+
+ if (last != null) {
+ yield(last!!)
+ }
+ }
+
+ var maxTime = Long.MIN_VALUE
+ flopsFragments.forEach { fragment ->
+ allFragments.add(fragment)
+ maxTime = max(maxTime, fragment.tick)
+ }
+
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", vmId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+
+ return allFragments
+ }
+}
+
+/**
+ * Conversion of the Azure public VM trace.
+ */
+class AzureConversion : TraceConversion("Azure") {
+ val seed by option(help = "seed for trace sampling")
+ .long()
+ .default(0)
+
+ override fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment> {
+ val random = Random(seed)
+ val fraction = 0.01
+
+ // Read VM table
+ val vmIdTableCol = 0
+ val coreTableCol = 9
+ val provisionedMemoryTableCol = 10
+
+ var vmId: String
+ var cores: Int
+ var requiredMemory: Long
+
+ val vmIds = mutableSetOf<String>()
+ val vmIdToMetadata = mutableMapOf<String, VmInfo>()
+
+ BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader ->
+ reader.lineSequence()
+ .chunked(1024)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+ // Sample only a fraction of the VMs
+ if (random.nextDouble() > fraction) {
+ continue
+ }
+
+ val values = line.split(",")
+
+ // Exclude VMs with a large number of cores (not specified exactly)
+ if (values[coreTableCol].contains(">")) {
+ continue
+ }
+
+ vmId = values[vmIdTableCol].trim()
+ cores = values[coreTableCol].trim().toInt()
+ requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB
+
+ vmIds.add(vmId)
+ vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L)
+ }
+ }
+ }
+
+ // Read VM metric reading files
+ val timestampCol = 0
+ val vmIdCol = 1
+ val cpuUsageCol = 4
+ val traceInterval = 5 * 60 * 1000L
+
+ val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>()
+ val vmIdToLastFragment = mutableMapOf<String, Fragment?>()
+ val allFragments = mutableListOf<Fragment>()
+
+ for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) {
+ val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv")
+ var timestamp: Long
+ var cpuUsage: Double
+
+ BufferedReader(FileReader(readingsFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(",")
+ vmId = values[vmIdCol].trim()
+
+ // Ignore readings for VMs not in the sample
+ if (!vmIds.contains(vmId)) {
+ continue
+ }
+
+ timestamp = values[timestampCol].trim().toLong() * 1000L
+ vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp)
+ cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz
+ vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+ val lastFragment = vmIdToLastFragment[vmId]
+
+ vmIdToLastFragment[vmId] =
+ if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) {
+ Fragment(
+ vmId,
+ lastFragment.tick,
+ lastFragment.flops + flops,
+ lastFragment.duration + traceInterval,
+ cpuUsage,
+ vmIdToMetadata[vmId]!!.cores
+ )
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ vmIdToMetadata[vmId]!!.cores
+ )
+ if (lastFragment != null) {
+ if (vmIdToFragments[vmId] == null) {
+ vmIdToFragments[vmId] = mutableListOf()
+ }
+ vmIdToFragments[vmId]!!.add(lastFragment)
+ allFragments.add(lastFragment)
+ }
+ fragment
+ }
+ }
+ }
+ }
+ }
+
+ for (entry in vmIdToLastFragment) {
+ if (entry.value != null) {
+ if (vmIdToFragments[entry.key] == null) {
+ vmIdToFragments[entry.key] = mutableListOf()
+ }
+ vmIdToFragments[entry.key]!!.add(entry.value!!)
+ }
+ }
+
+ println("Read ${vmIdToLastFragment.size} VMs")
+
+ for (entry in vmIdToMetadata) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", entry.key)
+ metaRecord.put("submissionTime", entry.value.minTime)
+ metaRecord.put("endTime", entry.value.maxTime)
+ println("${entry.value.minTime} - ${entry.value.maxTime}")
+ metaRecord.put("maxCores", entry.value.cores)
+ metaRecord.put("requiredMemory", entry.value.requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+
+ return allFragments
+ }
+}
+
+data class Fragment(
+ val id: String,
+ val tick: Long,
+ val flops: Long,
+ val duration: Long,
+ val usage: Double,
+ val cores: Int
+)
+
+class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long)
+
+/**
+ * A script to convert a trace in text format into a Parquet trace.
+ */
+fun main(args: Array<String>) = TraceConverterCli().main(args)
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
new file mode 100644
index 00000000..3a2ed4b7
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
@@ -0,0 +1,214 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.trace
+
+import com.atlarge.opendc.compute.core.image.VmImage
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload
+import com.atlarge.opendc.experiments.sc20.experiment.model.SamplingStrategy
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.format.trace.TraceEntry
+import mu.KotlinLogging
+import java.util.*
+import kotlin.random.Random
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * Sample the workload for the specified [run].
+ */
+fun sampleWorkload(
+ trace: List<TraceEntry<VmWorkload>>,
+ workload: Workload,
+ subWorkload: Workload,
+ seed: Int
+): List<TraceEntry<VmWorkload>> {
+ return when {
+ workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed)
+ workload.samplingStrategy == SamplingStrategy.HPC ->
+ sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false)
+ workload.samplingStrategy == SamplingStrategy.HPC_LOAD ->
+ sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true)
+ else ->
+ sampleRegularWorkload(trace, workload, workload, seed)
+ }
+}
+
+/**
+ * Sample a regular (non-HPC) workload.
+ */
+fun sampleRegularWorkload(
+ trace: List<TraceEntry<VmWorkload>>,
+ workload: Workload,
+ subWorkload: Workload,
+ seed: Int
+): List<TraceEntry<VmWorkload>> {
+ val fraction = subWorkload.fraction
+
+ val shuffled = trace.shuffled(Random(seed))
+ val res = mutableListOf<TraceEntry<VmWorkload>>()
+ val totalLoad = if (workload is CompositeWorkload) {
+ workload.totalLoad
+ } else {
+ shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double }
+ }
+ var currentLoad = 0.0
+
+ for (entry in shuffled) {
+ val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+}
+
+/**
+ * Sample a HPC workload.
+ */
+fun sampleHpcWorkload(
+ trace: List<TraceEntry<VmWorkload>>,
+ workload: Workload,
+ seed: Int,
+ sampleOnLoad: Boolean
+): List<TraceEntry<VmWorkload>> {
+ val pattern = Regex("^vm__workload__(ComputeNode|cn).*")
+ val random = Random(seed)
+
+ val fraction = workload.fraction
+ val (hpc, nonHpc) = trace.partition { entry ->
+ val name = entry.workload.image.name
+ name.matches(pattern)
+ }
+
+ val hpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<TraceEntry<VmWorkload>>()
+ hpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ val nonHpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<TraceEntry<VmWorkload>>()
+ nonHpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
+
+ val totalLoad = if (workload is CompositeWorkload) {
+ workload.totalLoad
+ } else {
+ trace.sumByDouble { it.workload.image.tags.getValue("total-load") as Double }
+ }
+
+ logger.debug { "Total trace load: $totalLoad" }
+ var hpcCount = 0
+ var hpcLoad = 0.0
+ var nonHpcCount = 0
+ var nonHpcLoad = 0.0
+
+ val res = mutableListOf<TraceEntry<VmWorkload>>()
+
+ if (sampleOnLoad) {
+ var currentLoad = 0.0
+ var i = 0
+ for (entry in hpcSequence) {
+ val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ hpcLoad += entryLoad
+ hpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ for (entry in nonHpcSequence) {
+ val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
+ if ((currentLoad + entryLoad) / totalLoad > 1) {
+ break
+ }
+
+ nonHpcLoad += entryLoad
+ nonHpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+ } else {
+ hpcSequence
+ .take((fraction * trace.size).toInt())
+ .forEach { entry ->
+ hpcLoad += entry.workload.image.tags.getValue("total-load") as Double
+ hpcCount += 1
+ res.add(entry)
+ }
+
+ nonHpcSequence
+ .take(((1 - fraction) * trace.size).toInt())
+ .forEach { entry ->
+ nonHpcLoad += entry.workload.image.tags.getValue("total-load") as Double
+ nonHpcCount += 1
+ res.add(entry)
+ }
+ }
+
+ logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
+ logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
+ logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+}
+
+/**
+ * Sample a random trace entry.
+ */
+private fun sample(entry: TraceEntry<VmWorkload>, i: Int): TraceEntry<VmWorkload> {
+ val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray())
+ val image = VmImage(
+ id,
+ entry.workload.image.name,
+ entry.workload.image.tags,
+ entry.workload.image.flopsHistory,
+ entry.workload.image.maxCores,
+ entry.workload.image.requiredMemory
+ )
+ val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name)
+ return VmTraceEntry(vmWorkload, entry.submissionTime)
+}
+
+private class VmTraceEntry(override val workload: VmWorkload, override val submissionTime: Long) : TraceEntry<VmWorkload>
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml
new file mode 100644
index 00000000..6906bfc3
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ MIT License
+ ~
+ ~ Copyright (c) 2020 atlarge-research
+ ~
+ ~ Permission is hereby granted, free of charge, to any person obtaining a copy
+ ~ of this software and associated documentation files (the "Software"), to deal
+ ~ in the Software without restriction, including without limitation the rights
+ ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ ~ copies of the Software, and to permit persons to whom the Software is
+ ~ furnished to do so, subject to the following conditions:
+ ~
+ ~ The above copyright notice and this permission notice shall be included in all
+ ~ copies or substantial portions of the Software.
+ ~
+ ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ ~ SOFTWARE.
+ -->
+
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false" />
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="com.atlarge.odcsim" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="com.atlarge.opendc" level="warn" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="com.atlarge.opendc.experiments.sc20" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="com.atlarge.opendc.experiments.sc20.trace" level="debug" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.apache.hadoop" level="warn" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Root level="error">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
new file mode 100644
index 00000000..ebee1543
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -0,0 +1,252 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
+import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
+import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor
+import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain
+import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
+import com.atlarge.opendc.experiments.sc20.experiment.processTrace
+import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
+import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
+import com.atlarge.opendc.format.environment.EnvironmentReader
+import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import com.atlarge.opendc.format.trace.TraceReader
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.TestCoroutineScope
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Assertions.assertAll
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import java.io.File
+import java.time.Clock
+
+/**
+ * An integration test suite for the SC20 experiments.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+class Sc20IntegrationTest {
+ /**
+ * The [TestCoroutineScope] to use.
+ */
+ private lateinit var testScope: TestCoroutineScope
+
+ /**
+ * The simulation clock to use.
+ */
+ private lateinit var clock: Clock
+
+ /**
+ * The monitor used to keep track of the metrics.
+ */
+ private lateinit var monitor: TestExperimentReporter
+
+ /**
+ * Setup the experimental environment.
+ */
+ @BeforeEach
+ fun setUp() {
+ testScope = TestCoroutineScope()
+ clock = DelayControllerClockAdapter(testScope)
+
+ monitor = TestExperimentReporter()
+ }
+
+ /**
+ * Tear down the experimental environment.
+ */
+ @AfterEach
+ fun tearDown() = testScope.cleanupTestCoroutines()
+
+ @Test
+ fun smoke() {
+ val failures = false
+ val seed = 0
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
+ val traceReader = createTestTraceReader()
+ val environmentReader = createTestEnvironmentReader()
+ lateinit var scheduler: SimpleVirtProvisioningService
+
+ testScope.launch {
+ val res = createProvisioner(
+ this,
+ clock,
+ environmentReader,
+ allocationPolicy
+ )
+ val bareMetalProvisioner = res.first
+ scheduler = res.second
+
+ val failureDomain = if (failures) {
+ println("ENABLING failures")
+ createFailureDomain(
+ this,
+ clock,
+ seed,
+ 24.0 * 7,
+ bareMetalProvisioner,
+ chan
+ )
+ } else {
+ null
+ }
+
+ attachMonitor(this, clock, scheduler, monitor)
+ processTrace(
+ this,
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+
+ println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+
+ failureDomain?.cancel()
+ scheduler.terminate()
+ monitor.close()
+ }
+
+ runSimulation()
+
+ // Note that these values have been verified beforehand
+ assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs")
+ assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run")
+ assertEquals(207379117949, monitor.totalRequestedBurst)
+ assertEquals(203388071813, monitor.totalGrantedBurst)
+ assertEquals(3991046136, monitor.totalOvercommissionedBurst)
+ assertEquals(0, monitor.totalInterferedBurst)
+ }
+
+ @Test
+ fun small() {
+ val seed = 1
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
+ val traceReader = createTestTraceReader(0.5, seed)
+ val environmentReader = createTestEnvironmentReader("single")
+ lateinit var scheduler: SimpleVirtProvisioningService
+
+ testScope.launch {
+ val res = createProvisioner(
+ this,
+ clock,
+ environmentReader,
+ allocationPolicy
+ )
+ scheduler = res.second
+
+ attachMonitor(this, clock, scheduler, monitor)
+ processTrace(
+ this,
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+
+ println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+
+ scheduler.terminate()
+ monitor.close()
+ }
+
+ runSimulation()
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(96344114723, monitor.totalRequestedBurst) },
+ { assertEquals(96324378235, monitor.totalGrantedBurst) },
+ { assertEquals(19736424, monitor.totalOvercommissionedBurst) },
+ { assertEquals(0, monitor.totalInterferedBurst) }
+ )
+ }
+
+ /**
+ * Run the simulation.
+ */
+ private fun runSimulation() = testScope.advanceUntilIdle()
+
+ /**
+ * Obtain the trace reader for the test.
+ */
+ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> {
+ return Sc20ParquetTraceReader(
+ listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))),
+ emptyMap(),
+ Workload("test", fraction),
+ seed
+ )
+ }
+
+ /**
+ * Obtain the environment reader for the test.
+ */
+ private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader {
+ val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt")
+ return Sc20ClusterEnvironmentReader(stream)
+ }
+
+ class TestExperimentReporter : ExperimentMonitor {
+ var totalRequestedBurst = 0L
+ var totalGrantedBurst = 0L
+ var totalOvercommissionedBurst = 0L
+ var totalInterferedBurst = 0L
+
+ override fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ numberOfDeployedImages: Int,
+ hostServer: Server,
+ duration: Long
+ ) {
+ totalRequestedBurst += requestedBurst
+ totalGrantedBurst += grantedBurst
+ totalOvercommissionedBurst += overcommissionedBurst
+ totalInterferedBurst += interferedBurst
+ }
+
+ override fun close() {}
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/single.txt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/single.txt
new file mode 100644
index 00000000..53b3c2d7
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/single.txt
@@ -0,0 +1,3 @@
+ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
+A01;A01;8;3.2;64;1;64;8
+
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/topology.txt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/topology.txt
new file mode 100644
index 00000000..6b347bff
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/topology.txt
@@ -0,0 +1,5 @@
+ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
+A01;A01;32;3.2;2048;1;256;32
+B01;B01;48;2.93;1256;6;64;8
+C01;C01;32;3.2;2048;2;128;16
+
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/meta.parquet b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/meta.parquet
new file mode 100644
index 00000000..ce7a812c
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/meta.parquet
Binary files differ
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/trace.parquet b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/trace.parquet
new file mode 100644
index 00000000..1d7ce882
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/trace.parquet
Binary files differ