summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt79
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt297
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt60
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt59
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt56
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt61
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt233
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt50
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt47
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt31
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt28
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt44
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt160
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt74
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt118
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt35
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt43
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt39
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt34
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt41
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt126
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt81
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt65
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt72
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt84
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt157
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt284
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt621
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt199
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml49
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt216
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt5
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquetbin0 -> 2148 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquetbin0 -> 1672463 bytes
35 files changed, 3551 insertions, 0 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
new file mode 100644
index 00000000..faabe5cb
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.harness.dsl.anyOf
+
+/**
+ * A [Portfolio] that explores the effect of a composite workload.
+ */
+public class CompositeWorkloadPortfolio : Portfolio("composite-workload") {
+ private val totalSampleLoad = 1.3301733005049648E12
+
+ override val topology: Topology by anyOf(
+ Topology("base"),
+ Topology("exp-vol-hor-hom"),
+ Topology("exp-vol-ver-hom"),
+ Topology("exp-vel-ver-hom")
+ )
+
+ override val workload: Workload by anyOf(
+ CompositeWorkload(
+ "all-azure",
+ listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "solvinity-25-azure-75",
+ listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "solvinity-50-azure-50",
+ listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "solvinity-75-azure-25",
+ listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "all-solvinity",
+ listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)),
+ totalSampleLoad
+ )
+ )
+
+ override val operationalPhenomena: OperationalPhenomena by anyOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false)
+ )
+
+ override val allocationPolicy: String by anyOf(
+ "active-servers"
+ )
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
new file mode 100644
index 00000000..763234f8
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -0,0 +1,297 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import mu.KotlinLogging
+import org.opendc.compute.api.*
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostListener
+import org.opendc.compute.service.driver.HostState
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.simulator.SimHost
+import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
+import org.opendc.experiments.capelin.monitor.ExperimentMonitor
+import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
+import org.opendc.format.environment.EnvironmentReader
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.failures.CorrelatedFaultInjector
+import org.opendc.simulator.failures.FaultInjector
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import java.io.File
+import java.time.Clock
+import kotlin.coroutines.coroutineContext
+import kotlin.coroutines.resume
+import kotlin.math.ln
+import kotlin.math.max
+import kotlin.random.Random
+
+/**
+ * The logger for this experiment.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * Construct the failure domain for the experiments.
+ */
+public fun createFailureDomain(
+ coroutineScope: CoroutineScope,
+ clock: Clock,
+ seed: Int,
+ failureInterval: Double,
+ service: ComputeService,
+ chan: Channel<Unit>
+): CoroutineScope {
+ val job = coroutineScope.launch {
+ chan.receive()
+ val random = Random(seed)
+ val injectors = mutableMapOf<String, FaultInjector>()
+ for (host in service.hosts) {
+ val cluster = host.meta["cluster"] as String
+ val injector =
+ injectors.getOrPut(cluster) {
+ createFaultInjector(
+ this,
+ clock,
+ random,
+ failureInterval
+ )
+ }
+ injector.enqueue(host as SimHost)
+ }
+ }
+ return CoroutineScope(coroutineScope.coroutineContext + job)
+}
+
+/**
+ * Obtain the [FaultInjector] to use for the experiments.
+ */
+public fun createFaultInjector(
+ coroutineScope: CoroutineScope,
+ clock: Clock,
+ random: Random,
+ failureInterval: Double
+): FaultInjector {
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return CorrelatedFaultInjector(
+ coroutineScope,
+ clock,
+ iatScale = ln(failureInterval), iatShape = 1.03, // Hours
+ sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
+ dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
+ random = random
+ )
+}
+
+/**
+ * Create the trace reader from which the VM workloads are read.
+ */
+public fun createTraceReader(
+ path: File,
+ performanceInterferenceModel: PerformanceInterferenceModel,
+ vms: List<String>,
+ seed: Int
+): Sc20StreamingParquetTraceReader {
+ return Sc20StreamingParquetTraceReader(
+ path,
+ performanceInterferenceModel,
+ vms,
+ Random(seed)
+ )
+}
+
+/**
+ * Construct the environment for a simulated compute service..
+ */
+public suspend fun withComputeService(
+ clock: Clock,
+ meterProvider: MeterProvider,
+ environmentReader: EnvironmentReader,
+ scheduler: ComputeScheduler,
+ block: suspend CoroutineScope.(ComputeService) -> Unit
+): Unit = coroutineScope {
+ val hosts = environmentReader
+ .use { it.read() }
+ .map { def ->
+ SimHost(
+ def.uid,
+ def.name,
+ def.model,
+ def.meta,
+ coroutineContext,
+ clock,
+ meterProvider.get("opendc-compute-simulator"),
+ SimFairShareHypervisorProvider(),
+ def.powerModel
+ )
+ }
+
+ val serviceMeter = meterProvider.get("opendc-compute")
+ val service =
+ ComputeService(coroutineContext, clock, serviceMeter, scheduler)
+
+ for (host in hosts) {
+ service.addHost(host)
+ }
+
+ try {
+ block(this, service)
+ } finally {
+ service.close()
+ hosts.forEach(SimHost::close)
+ }
+}
+
+/**
+ * Attach the specified monitor to the VM provisioner.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public suspend fun withMonitor(
+ monitor: ExperimentMonitor,
+ clock: Clock,
+ metricProducer: MetricProducer,
+ scheduler: ComputeService,
+ block: suspend CoroutineScope.() -> Unit
+): Unit = coroutineScope {
+ val monitorJobs = mutableSetOf<Job>()
+
+ // Monitor host events
+ for (host in scheduler.hosts) {
+ monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
+ host.addListener(object : HostListener {
+ override fun onStateChanged(host: Host, newState: HostState) {
+ monitor.reportHostStateChange(clock.millis(), host, newState)
+ }
+ })
+ }
+
+ val reader = CoroutineMetricReader(
+ this,
+ listOf(metricProducer),
+ ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }),
+ exportInterval = 5 * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */
+ )
+
+ try {
+ block(this)
+ } finally {
+ monitorJobs.forEach(Job::cancel)
+ reader.close()
+ monitor.close()
+ }
+}
+
+public class ComputeMetrics {
+ public var submittedVms: Int = 0
+ public var queuedVms: Int = 0
+ public var runningVms: Int = 0
+ public var unscheduledVms: Int = 0
+ public var finishedVms: Int = 0
+}
+
+/**
+ * Collect the metrics of the compute service.
+ */
+public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
+ val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
+ val res = ComputeMetrics()
+ try {
+ // Hack to extract metrics from OpenTelemetry SDK
+ res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Failed to collect metrics" }
+ }
+ return res
+}
+
+/**
+ * Process the trace.
+ */
+public suspend fun processTrace(
+ clock: Clock,
+ reader: TraceReader<SimWorkload>,
+ scheduler: ComputeService,
+ chan: Channel<Unit>,
+ monitor: ExperimentMonitor
+) {
+ val client = scheduler.newClient()
+ val image = client.newImage("vm-image")
+ var offset = Long.MIN_VALUE
+ try {
+ coroutineScope {
+ while (reader.hasNext()) {
+ val entry = reader.next()
+
+ if (offset < 0) {
+ offset = entry.start - clock.millis()
+ }
+
+ delay(max(0, (entry.start - offset) - clock.millis()))
+ launch {
+ chan.send(Unit)
+ val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace)
+ val server = client.newServer(
+ entry.name,
+ image,
+ client.newFlavor(
+ entry.name,
+ entry.meta["cores"] as Int,
+ entry.meta["required-memory"] as Long
+ ),
+ meta = entry.meta + mapOf("workload" to workload)
+ )
+
+ suspendCancellableCoroutine { cont ->
+ server.watch(object : ServerWatcher {
+ override fun onStateChanged(server: Server, newState: ServerState) {
+ monitor.reportVmStateChange(clock.millis(), server, newState)
+
+ if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) {
+ cont.resume(Unit)
+ }
+ }
+ })
+ }
+ }
+ }
+ }
+
+ yield()
+ } finally {
+ reader.close()
+ client.close()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
new file mode 100644
index 00000000..e1cf8517
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.harness.dsl.anyOf
+
+/**
+ * A [Portfolio] that explores the difference between horizontal and vertical scaling.
+ */
+public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") {
+ override val topology: Topology by anyOf(
+ Topology("base"),
+ Topology("rep-vol-hor-hom"),
+ Topology("rep-vol-hor-het"),
+ Topology("rep-vol-ver-hom"),
+ Topology("rep-vol-ver-het"),
+ Topology("exp-vol-hor-hom"),
+ Topology("exp-vol-hor-het"),
+ Topology("exp-vol-ver-hom"),
+ Topology("exp-vol-ver-het")
+ )
+
+ override val workload: Workload by anyOf(
+ Workload("solvinity", 0.1),
+ Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomena: OperationalPhenomena by anyOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicy: String by anyOf(
+ "active-servers"
+ )
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
new file mode 100644
index 00000000..a995e467
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.SamplingStrategy
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.harness.dsl.anyOf
+
+/**
+ * A [Portfolio] to explore the effect of HPC workloads.
+ */
+public class MoreHpcPortfolio : Portfolio("more_hpc") {
+ override val topology: Topology by anyOf(
+ Topology("base"),
+ Topology("exp-vol-hor-hom"),
+ Topology("exp-vol-ver-hom"),
+ Topology("exp-vel-ver-hom")
+ )
+
+ override val workload: Workload by anyOf(
+ Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC),
+ Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC),
+ Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC),
+ Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC),
+ Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD),
+ Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD),
+ Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD)
+ )
+
+ override val operationalPhenomena: OperationalPhenomena by anyOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicy: String by anyOf(
+ "active-servers"
+ )
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
new file mode 100644
index 00000000..49559e0e
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.harness.dsl.anyOf
+
+/**
+ * A [Portfolio] that explores the effect of adding more velocity to a cluster (e.g., faster machines).
+ */
+public class MoreVelocityPortfolio : Portfolio("more_velocity") {
+ override val topology: Topology by anyOf(
+ Topology("base"),
+ Topology("rep-vel-ver-hom"),
+ Topology("rep-vel-ver-het"),
+ Topology("exp-vel-ver-hom"),
+ Topology("exp-vel-ver-het")
+ )
+
+ override val workload: Workload by anyOf(
+ Workload("solvinity", 0.1),
+ Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomena: OperationalPhenomena by anyOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicy: String by anyOf(
+ "active-servers"
+ )
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
new file mode 100644
index 00000000..1aac4f9e
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.harness.dsl.anyOf
+
+/**
+ * A [Portfolio] that explores the effect of operational phenomena on metrics.
+ */
+public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena") {
+ override val topology: Topology by anyOf(
+ Topology("base")
+ )
+
+ override val workload: Workload by anyOf(
+ Workload("solvinity", 0.1),
+ Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomena: OperationalPhenomena by anyOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true),
+ OperationalPhenomena(failureFrequency = 0.0, hasInterference = true),
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false),
+ OperationalPhenomena(failureFrequency = 0.0, hasInterference = false)
+ )
+
+ override val allocationPolicy: String by anyOf(
+ "mem",
+ "mem-inv",
+ "core-mem",
+ "core-mem-inv",
+ "active-servers",
+ "active-servers-inv",
+ "random"
+ )
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
new file mode 100644
index 00000000..b969366c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -0,0 +1,233 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.channels.Channel
+import mu.KotlinLogging
+import org.opendc.compute.service.scheduler.*
+import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.weights.*
+import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
+import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
+import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
+import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import org.opendc.format.trace.PerformanceInterferenceModelReader
+import org.opendc.harness.dsl.Experiment
+import org.opendc.harness.dsl.anyOf
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.sdk.toOtelClock
+import java.io.File
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import kotlin.random.asKotlinRandom
+
+/**
+ * A portfolio represents a collection of scenarios are tested for the work.
+ *
+ * @param name The name of the portfolio.
+ */
+public abstract class Portfolio(name: String) : Experiment(name) {
+ /**
+ * The logger for this portfolio instance.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The path to where the environments are located.
+ */
+ private val environmentPath by anyOf(File("input/environments/"))
+
+ /**
+ * The path to where the traces are located.
+ */
+ private val tracePath by anyOf(File("input/traces/"))
+
+ /**
+ * The path to where the output results should be written.
+ */
+ private val outputPath by anyOf(File("output/"))
+
+ /**
+ * The path to the original VM placements file.
+ */
+ private val vmPlacements by anyOf(emptyMap<String, String>())
+
+ /**
+ * The path to the performance interference model.
+ */
+ private val performanceInterferenceModel by anyOf<PerformanceInterferenceModelReader?>(null)
+
+ /**
+ * The topology to test.
+ */
+ public abstract val topology: Topology
+
+ /**
+ * The workload to test.
+ */
+ public abstract val workload: Workload
+
+ /**
+ * The operational phenomenas to consider.
+ */
+ public abstract val operationalPhenomena: OperationalPhenomena
+
+ /**
+ * The allocation policies to consider.
+ */
+ public abstract val allocationPolicy: String
+
+ /**
+ * A map of trace readers.
+ */
+ private val traceReaders = ConcurrentHashMap<String, Sc20RawParquetTraceReader>()
+
+ /**
+ * Perform a single trial for this portfolio.
+ */
+ @OptIn(ExperimentalCoroutinesApi::class)
+ override fun doRun(repeat: Int): Unit = runBlockingSimulation {
+ val seeder = Random(repeat.toLong())
+ val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt"))
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = createComputeScheduler(seeder)
+
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val workload = workload
+ val workloadNames = if (workload is CompositeWorkload) {
+ workload.workloads.map { it.name }
+ } else {
+ listOf(workload.name)
+ }
+
+ val rawReaders = workloadNames.map { workloadName ->
+ traceReaders.computeIfAbsent(workloadName) {
+ logger.info { "Loading trace $workloadName" }
+ Sc20RawParquetTraceReader(File(tracePath, workloadName))
+ }
+ }
+
+ val performanceInterferenceModel = performanceInterferenceModel
+ ?.takeIf { operationalPhenomena.hasInterference }
+ ?.construct(seeder.asKotlinRandom()) ?: emptyMap()
+ val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt())
+
+ val monitor = ParquetExperimentMonitor(
+ outputPath,
+ "portfolio_id=$name/scenario_id=$id/run_id=$repeat",
+ 4096
+ )
+
+ withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
+ val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
+ logger.debug("ENABLING failures")
+ createFailureDomain(
+ this,
+ clock,
+ seeder.nextInt(),
+ operationalPhenomena.failureFrequency,
+ scheduler,
+ chan
+ )
+ } else {
+ null
+ }
+
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ trace,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
+
+ failureDomain?.cancel()
+ }
+
+ val monitorResults = collectMetrics(meterProvider as MetricProducer)
+ logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
+ }
+
+ /**
+ * Create the [ComputeScheduler] instance to use for the trial.
+ */
+ private fun createComputeScheduler(seeder: Random): ComputeScheduler {
+ return when (allocationPolicy) {
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(MemoryWeigher() to -1.0)
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(MemoryWeigher() to -1.0)
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(InstanceCountWeigher() to 1.0)
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to 1.0)
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(RandomWeigher(Random(seeder.nextLong())) to 1.0)
+ )
+ "replay" -> ReplayScheduler(vmPlacements)
+ else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
+ }
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
new file mode 100644
index 00000000..b6d3b30c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.harness.dsl.anyOf
+
+/**
+ * A [Portfolio] that compares the original VM placements against our policies.
+ */
+public class ReplayPortfolio : Portfolio("replay") {
+ override val topology: Topology by anyOf(
+ Topology("base")
+ )
+
+ override val workload: Workload by anyOf(
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomena: OperationalPhenomena by anyOf(
+ OperationalPhenomena(failureFrequency = 0.0, hasInterference = false)
+ )
+
+ override val allocationPolicy: String by anyOf(
+ "replay",
+ "active-servers"
+ )
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
new file mode 100644
index 00000000..90840db8
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.harness.dsl.anyOf
+
+/**
+ * A [Portfolio] to perform a simple test run.
+ */
+public class TestPortfolio : Portfolio("test") {
+ override val topology: Topology by anyOf(
+ Topology("base")
+ )
+
+ override val workload: Workload by anyOf(
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomena: OperationalPhenomena by anyOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicy: String by anyOf("active-servers")
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt
new file mode 100644
index 00000000..b53b3617
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.model
+
+/**
+ * Operation phenomena during experiments.
+ *
+ * @param failureFrequency The average time between failures in hours.
+ * @param hasInterference A flag to enable performance interference between VMs.
+ */
+public data class OperationalPhenomena(val failureFrequency: Double, val hasInterference: Boolean)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt
new file mode 100644
index 00000000..fe16a294
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.model
+
+/**
+ * The topology topology on which we test the workload.
+ */
+public data class Topology(val name: String)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
new file mode 100644
index 00000000..c4ddd158
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.model
+
+public enum class SamplingStrategy {
+ REGULAR,
+ HPC,
+ HPC_LOAD
+}
+
+/**
+ * A workload that is considered for a scenario.
+ */
+public open class Workload(
+ public open val name: String,
+ public val fraction: Double,
+ public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR
+)
+
+/**
+ * A workload that is composed of multiple workloads.
+ */
+public class CompositeWorkload(override val name: String, public val workloads: List<Workload>, public val totalLoad: Double) :
+ Workload(name, -1.0)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
new file mode 100644
index 00000000..5f8002e2
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
@@ -0,0 +1,160 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.monitor
+
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import org.opendc.compute.service.driver.Host
+import java.time.Clock
+
+/**
+ * A [MetricExporter] that exports the metrics to the [ExperimentMonitor].
+ */
+public class ExperimentMetricExporter(
+ private val monitor: ExperimentMonitor,
+ private val clock: Clock,
+ private val hosts: Map<String, Host>
+) : MetricExporter {
+ override fun export(metrics: Collection<MetricData>): CompletableResultCode {
+ val metricsByName = metrics.associateBy { it.name }
+ reportHostMetrics(metricsByName)
+ reportProvisionerMetrics(metricsByName)
+ return CompletableResultCode.ofSuccess()
+ }
+
+ private fun reportHostMetrics(metrics: Map<String, MetricData>) {
+ val hostMetrics = mutableMapOf<String, HostMetrics>()
+ hosts.mapValuesTo(hostMetrics) { HostMetrics() }
+
+ mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v ->
+ m.cpuDemand = v
+ }
+
+ mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v ->
+ m.cpuUsage = v
+ }
+
+ mapDoubleSummary(metrics["power.usage"], hostMetrics) { m, v ->
+ m.powerDraw = v
+ }
+
+ mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v ->
+ m.requestedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v ->
+ m.grantedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v ->
+ m.overcommissionedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v ->
+ m.interferedBurst = v.toLong()
+ }
+
+ mapLongSum(metrics["guests.active"], hostMetrics) { m, v ->
+ m.numberOfDeployedImages = v.toInt()
+ }
+
+ for ((id, hostMetric) in hostMetrics) {
+ val host = hosts.getValue(id)
+ monitor.reportHostSlice(
+ clock.millis(),
+ hostMetric.requestedBurst,
+ hostMetric.grantedBurst,
+ hostMetric.overcommissionedBurst,
+ hostMetric.interferedBurst,
+ hostMetric.cpuUsage,
+ hostMetric.cpuDemand,
+ hostMetric.powerDraw,
+ hostMetric.numberOfDeployedImages,
+ host
+ )
+ }
+ }
+
+ private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data?.doubleSummaryData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ // Take the average of the summary
+ val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2
+ block(hostMetric, avg)
+ }
+ }
+ }
+
+ private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) {
+ val points = data?.longSumData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.value)
+ }
+ }
+ }
+
+ private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) {
+ val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+
+ monitor.reportProvisionerMetrics(
+ clock.millis(),
+ hosts,
+ availableHosts,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ }
+
+ private class HostMetrics {
+ var requestedBurst: Long = 0
+ var grantedBurst: Long = 0
+ var overcommissionedBurst: Long = 0
+ var interferedBurst: Long = 0
+ var cpuUsage: Double = 0.0
+ var cpuDemand: Double = 0.0
+ var numberOfDeployedImages: Int = 0
+ var powerDraw: Double = 0.0
+ }
+
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
new file mode 100644
index 00000000..68631dee
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.monitor
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostState
+
+/**
+ * A monitor watches the events of an experiment.
+ */
+public interface ExperimentMonitor : AutoCloseable {
+ /**
+ * This method is invoked when the state of a VM changes.
+ */
+ public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
+
+ /**
+ * This method is invoked when the state of a host changes.
+ */
+ public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {}
+
+ /**
+ * This method is invoked for a host for each slice that is finishes.
+ */
+ public fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ powerDraw: Double,
+ numberOfDeployedImages: Int,
+ host: Host
+ ) {
+ }
+
+ /**
+ * This method is invoked for a provisioner event.
+ */
+ public fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {}
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
new file mode 100644
index 00000000..983b4cff
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.monitor
+
+import mu.KotlinLogging
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostState
+import org.opendc.experiments.capelin.telemetry.HostEvent
+import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
+import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter
+import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter
+import java.io.File
+
+/**
+ * The logger instance to use.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * An [ExperimentMonitor] that logs the events to a Parquet file.
+ */
+public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor {
+ private val hostWriter = ParquetHostEventWriter(
+ File(base, "host-metrics/$partition/data.parquet"),
+ bufferSize
+ )
+ private val provisionerWriter = ParquetProvisionerEventWriter(
+ File(base, "provisioner-metrics/$partition/data.parquet"),
+ bufferSize
+ )
+
+ override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
+
+ override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
+ logger.debug { "Host ${host.uid} changed state $newState [$time]" }
+ }
+
+ override fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ powerDraw: Double,
+ numberOfDeployedImages: Int,
+ host: Host
+ ) {
+ hostWriter.write(
+ HostEvent(
+ time,
+ 5 * 60 * 1000L,
+ host,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ powerDraw,
+ host.model.cpuCount
+ )
+ )
+ }
+
+ override fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {
+ provisionerWriter.write(
+ ProvisionerEvent(
+ time,
+ totalHostCount,
+ availableHostCount,
+ totalVmCount,
+ activeVmCount,
+ inactiveVmCount,
+ waitingVmCount,
+ failedVmCount
+ )
+ )
+ }
+
+ override fun close() {
+ hostWriter.close()
+ provisionerWriter.close()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt
new file mode 100644
index 00000000..c29e116e
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt
@@ -0,0 +1,35 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry
+
+/**
+ * An event that occurs within the system.
+ */
+public abstract class Event(public val name: String) {
+ /**
+ * The time of occurrence of this event.
+ */
+ public abstract val timestamp: Long
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
new file mode 100644
index 00000000..899fc9b1
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry
+
+import org.opendc.compute.service.driver.Host
+
+/**
+ * A periodic report of the host machine metrics.
+ */
+public data class HostEvent(
+ override val timestamp: Long,
+ public val duration: Long,
+ public val host: Host,
+ public val vmCount: Int,
+ public val requestedBurst: Long,
+ public val grantedBurst: Long,
+ public val overcommissionedBurst: Long,
+ public val interferedBurst: Long,
+ public val cpuUsage: Double,
+ public val cpuDemand: Double,
+ public val powerDraw: Double,
+ public val cores: Int
+) : Event("host-metrics")
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt
new file mode 100644
index 00000000..539c9bc9
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt
@@ -0,0 +1,39 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry
+
+/**
+ * A periodic report of the provisioner's metrics.
+ */
+public data class ProvisionerEvent(
+ override val timestamp: Long,
+ public val totalHostCount: Int,
+ public val availableHostCount: Int,
+ public val totalVmCount: Int,
+ public val activeVmCount: Int,
+ public val inactiveVmCount: Int,
+ public val waitingVmCount: Int,
+ public val failedVmCount: Int
+) : Event("provisioner-metrics")
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt
new file mode 100644
index 00000000..6c8fc941
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry
+
+import org.opendc.experiments.capelin.Portfolio
+
+/**
+ * A periodic report of the host machine metrics.
+ */
+public data class RunEvent(
+ val portfolio: Portfolio,
+ val repeat: Int,
+ override val timestamp: Long
+) : Event("run")
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
new file mode 100644
index 00000000..7631f55f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry
+
+import org.opendc.compute.api.Server
+
+/**
+ * A periodic report of a virtual machine's metrics.
+ */
+public data class VmEvent(
+ override val timestamp: Long,
+ public val duration: Long,
+ public val vm: Server,
+ public val host: Server,
+ public val requestedBurst: Long,
+ public val grantedBurst: Long,
+ public val overcommissionedBurst: Long,
+ public val interferedBurst: Long,
+ public val cpuUsage: Double,
+ public val cpuDemand: Double
+) : Event("vm-metrics")
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
new file mode 100644
index 00000000..38930ee5
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
@@ -0,0 +1,126 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry.parquet
+
+import mu.KotlinLogging
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.experiments.capelin.telemetry.Event
+import java.io.Closeable
+import java.io.File
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import kotlin.concurrent.thread
+
+/**
+ * The logging instance to use.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * A writer that writes events in Parquet format.
+ */
+public open class ParquetEventWriter<in T : Event>(
+ private val path: File,
+ private val schema: Schema,
+ private val converter: (T, GenericData.Record) -> Unit,
+ private val bufferSize: Int = 4096
+) : Runnable, Closeable {
+ /**
+ * The writer to write the Parquet file.
+ */
+ private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ /**
+ * The queue of commands to process.
+ */
+ private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize)
+
+ /**
+ * The thread that is responsible for writing the Parquet records.
+ */
+ private val writerThread = thread(start = false, name = "parquet-writer") { run() }
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ public fun write(event: T) {
+ queue.put(Action.Write(event))
+ }
+
+ /**
+ * Signal the writer to stop.
+ */
+ public override fun close() {
+ queue.put(Action.Stop)
+ writerThread.join()
+ }
+
+ init {
+ writerThread.start()
+ }
+
+ /**
+ * Start the writer thread.
+ */
+ override fun run() {
+ try {
+ loop@ while (true) {
+ val action = queue.take()
+ when (action) {
+ is Action.Stop -> break@loop
+ is Action.Write<*> -> {
+ val record = GenericData.Record(schema)
+ @Suppress("UNCHECKED_CAST")
+ converter(action.event as T, record)
+ writer.write(record)
+ }
+ }
+ }
+ } catch (e: Throwable) {
+ logger.error("Writer failed", e)
+ } finally {
+ writer.close()
+ }
+ }
+
+ public sealed class Action {
+ /**
+ * A poison pill that will stop the writer thread.
+ */
+ public object Stop : Action()
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ public data class Write<out T : Event>(val event: T) : Action()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
new file mode 100644
index 00000000..c8fe1cb2
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.opendc.experiments.capelin.telemetry.HostEvent
+import java.io.File
+
+/**
+ * A Parquet event writer for [HostEvent]s.
+ */
+public class ParquetHostEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "host-writer"
+
+ public companion object {
+ private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record ->
+ // record.put("portfolio_id", event.run.parent.parent.id)
+ // record.put("scenario_id", event.run.parent.id)
+ // record.put("run_id", event.run.id)
+ record.put("host_id", event.host.name)
+ record.put("state", event.host.state.name)
+ record.put("timestamp", event.timestamp)
+ record.put("duration", event.duration)
+ record.put("vm_count", event.vmCount)
+ record.put("requested_burst", event.requestedBurst)
+ record.put("granted_burst", event.grantedBurst)
+ record.put("overcommissioned_burst", event.overcommissionedBurst)
+ record.put("interfered_burst", event.interferedBurst)
+ record.put("cpu_usage", event.cpuUsage)
+ record.put("cpu_demand", event.cpuDemand)
+ record.put("power_draw", event.powerDraw)
+ record.put("cores", event.cores)
+ }
+
+ private val schema: Schema = SchemaBuilder
+ .record("host_metrics")
+ .namespace("org.opendc.experiments.sc20")
+ .fields()
+ // .name("portfolio_id").type().intType().noDefault()
+ // .name("scenario_id").type().intType().noDefault()
+ // .name("run_id").type().intType().noDefault()
+ .name("timestamp").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("host_id").type().stringType().noDefault()
+ .name("state").type().stringType().noDefault()
+ .name("vm_count").type().intType().noDefault()
+ .name("requested_burst").type().longType().noDefault()
+ .name("granted_burst").type().longType().noDefault()
+ .name("overcommissioned_burst").type().longType().noDefault()
+ .name("interfered_burst").type().longType().noDefault()
+ .name("cpu_usage").type().doubleType().noDefault()
+ .name("cpu_demand").type().doubleType().noDefault()
+ .name("power_draw").type().doubleType().noDefault()
+ .name("cores").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt
new file mode 100644
index 00000000..8feff8d9
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
+import java.io.File
+
+/**
+ * A Parquet event writer for [ProvisionerEvent]s.
+ */
+public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "provisioner-writer"
+
+ public companion object {
+ private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record ->
+ record.put("timestamp", event.timestamp)
+ record.put("host_total_count", event.totalHostCount)
+ record.put("host_available_count", event.availableHostCount)
+ record.put("vm_total_count", event.totalVmCount)
+ record.put("vm_active_count", event.activeVmCount)
+ record.put("vm_inactive_count", event.inactiveVmCount)
+ record.put("vm_waiting_count", event.waitingVmCount)
+ record.put("vm_failed_count", event.failedVmCount)
+ }
+
+ private val schema: Schema = SchemaBuilder
+ .record("provisioner_metrics")
+ .namespace("org.opendc.experiments.sc20")
+ .fields()
+ .name("timestamp").type().longType().noDefault()
+ .name("host_total_count").type().intType().noDefault()
+ .name("host_available_count").type().intType().noDefault()
+ .name("vm_total_count").type().intType().noDefault()
+ .name("vm_active_count").type().intType().noDefault()
+ .name("vm_inactive_count").type().intType().noDefault()
+ .name("vm_waiting_count").type().intType().noDefault()
+ .name("vm_failed_count").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt
new file mode 100644
index 00000000..946410eb
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.opendc.experiments.capelin.telemetry.RunEvent
+import java.io.File
+
+/**
+ * A Parquet event writer for [RunEvent]s.
+ */
+public class ParquetRunEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "run-writer"
+
+ public companion object {
+ private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record ->
+ val portfolio = event.portfolio
+ record.put("portfolio_name", portfolio.name)
+ record.put("scenario_id", portfolio.id)
+ record.put("run_id", event.repeat)
+ record.put("topology", portfolio.topology.name)
+ record.put("workload_name", portfolio.workload.name)
+ record.put("workload_fraction", portfolio.workload.fraction)
+ record.put("workload_sampler", portfolio.workload.samplingStrategy)
+ record.put("allocation_policy", portfolio.allocationPolicy)
+ record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency)
+ record.put("interference", portfolio.operationalPhenomena.hasInterference)
+ record.put("seed", event.repeat)
+ }
+
+ private val schema: Schema = SchemaBuilder
+ .record("runs")
+ .namespace("org.opendc.experiments.sc20")
+ .fields()
+ .name("portfolio_name").type().stringType().noDefault()
+ .name("scenario_id").type().intType().noDefault()
+ .name("run_id").type().intType().noDefault()
+ .name("topology").type().stringType().noDefault()
+ .name("workload_name").type().stringType().noDefault()
+ .name("workload_fraction").type().doubleType().noDefault()
+ .name("workload_sampler").type().stringType().noDefault()
+ .name("allocation_policy").type().stringType().noDefault()
+ .name("failure_frequency").type().doubleType().noDefault()
+ .name("interference").type().booleanType().noDefault()
+ .name("seed").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
new file mode 100644
index 00000000..a8462a51
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.format.trace.TraceEntry
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.util.TreeSet
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param reader The internal trace reader to use.
+ * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ * @param run The run to which this reader belongs.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+public class Sc20ParquetTraceReader(
+ rawReaders: List<Sc20RawParquetTraceReader>,
+ performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
+ workload: Workload,
+ seed: Int
+) : TraceReader<SimWorkload> {
+ /**
+ * The iterator over the actual trace.
+ */
+ private val iterator: Iterator<TraceEntry<SimWorkload>> =
+ rawReaders
+ .map { it.read() }
+ .run {
+ if (workload is CompositeWorkload) {
+ this.zip(workload.workloads)
+ } else {
+ this.zip(listOf(workload))
+ }
+ }
+ .map { sampleWorkload(it.first, workload, it.second, seed) }
+ .flatten()
+ .run {
+ // Apply performance interference model
+ if (performanceInterferenceModel.isEmpty())
+ this
+ else {
+ map { entry ->
+ val id = entry.name
+ val relevantPerformanceInterferenceModelItems =
+ performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
+
+ entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems))
+ }
+ }
+ }
+ .iterator()
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
+
+ override fun close() {}
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
new file mode 100644
index 00000000..ffbf46d4
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import mu.KotlinLogging
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetReader
+import org.opendc.format.trace.TraceEntry
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.io.File
+import java.util.UUID
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param path The directory of the traces.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+public class Sc20RawParquetTraceReader(private val path: File) {
+ /**
+ * Read the fragments into memory.
+ */
+ private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> {
+ @Suppress("DEPRECATION")
+ val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet"))
+ .disableCompatibility()
+ .build()
+
+ val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
+
+ return try {
+ while (true) {
+ val record = reader.read() ?: break
+
+ val id = record["id"].toString()
+ val duration = record["duration"] as Long
+ val cores = record["cores"] as Int
+ val cpuUsage = record["cpuUsage"] as Double
+
+ val fragment = SimTraceWorkload.Fragment(
+ duration,
+ cpuUsage,
+ cores
+ )
+
+ fragments.getOrPut(id) { mutableListOf() }.add(fragment)
+ }
+
+ fragments
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Read the metadata into a workload.
+ */
+ private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
+ @Suppress("DEPRECATION")
+ val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet"))
+ .disableCompatibility()
+ .build()
+
+ var counter = 0
+ val entries = mutableListOf<TraceEntry<SimWorkload>>()
+
+ return try {
+ while (true) {
+ val record = metaReader.read() ?: break
+
+ val id = record["id"].toString()
+ if (!fragments.containsKey(id)) {
+ continue
+ }
+
+ val submissionTime = record["submissionTime"] as Long
+ val endTime = record["endTime"] as Long
+ val maxCores = record["maxCores"] as Int
+ val requiredMemory = record["requiredMemory"] as Long
+ val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
+
+ val vmFragments = fragments.getValue(id).asSequence()
+ val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
+ val workload = SimTraceWorkload(vmFragments)
+ entries.add(
+ TraceEntry(
+ uid, id, submissionTime, workload,
+ mapOf(
+ "submit-time" to submissionTime,
+ "end-time" to endTime,
+ "total-load" to totalLoad,
+ "cores" to maxCores,
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
+ )
+ )
+ }
+
+ entries
+ } catch (e: Exception) {
+ e.printStackTrace()
+ throw e
+ } finally {
+ metaReader.close()
+ }
+ }
+
+ /**
+ * The entries in the trace.
+ */
+ private val entries: List<TraceEntry<SimWorkload>>
+
+ init {
+ val fragments = parseFragments(path)
+ entries = parseMeta(path, fragments)
+ }
+
+ /**
+ * Read the entries in the trace.
+ */
+ public fun read(): List<TraceEntry<SimWorkload>> = entries
+
+ /**
+ * Create a [TraceReader] instance.
+ */
+ public fun createReader(): TraceReader<SimWorkload> {
+ return object : TraceReader<SimWorkload>, Iterator<TraceEntry<SimWorkload>> by entries.iterator() {
+ override fun close() {}
+ }
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
new file mode 100644
index 00000000..c5294b55
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
@@ -0,0 +1,284 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import mu.KotlinLogging
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetReader
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.filter2.predicate.Statistics
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate
+import org.apache.parquet.io.api.Binary
+import org.opendc.format.trace.TraceEntry
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.io.File
+import java.io.Serializable
+import java.util.SortedSet
+import java.util.TreeSet
+import java.util.UUID
+import java.util.concurrent.ArrayBlockingQueue
+import kotlin.concurrent.thread
+import kotlin.random.Random
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
+ *
+ * @param traceFile The directory of the traces.
+ * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+public class Sc20StreamingParquetTraceReader(
+ traceFile: File,
+ performanceInterferenceModel: PerformanceInterferenceModel? = null,
+ selectedVms: List<String> = emptyList(),
+ random: Random
+) : TraceReader<SimWorkload> {
+ /**
+ * The internal iterator to use for this reader.
+ */
+ private val iterator: Iterator<TraceEntry<SimWorkload>>
+
+ /**
+ * The intermediate buffer to store the read records in.
+ */
+ private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024)
+
+ /**
+ * An optional filter for filtering the selected VMs
+ */
+ private val filter =
+ if (selectedVms.isEmpty())
+ null
+ else
+ FilterCompat.get(
+ FilterApi.userDefined(
+ FilterApi.binaryColumn("id"),
+ SelectedVmFilter(
+ TreeSet(selectedVms)
+ )
+ )
+ )
+
+ /**
+ * A poisonous fragment.
+ */
+ private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0))
+
+ /**
+ * The thread to read the records in.
+ */
+ private val readerThread = thread(start = true, name = "sc20-reader") {
+ @Suppress("DEPRECATION")
+ val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
+ .disableCompatibility()
+ .run { if (filter != null) withFilter(filter) else this }
+ .build()
+
+ try {
+ while (true) {
+ val record = reader.read()
+
+ if (record == null) {
+ queue.put(poison)
+ break
+ }
+
+ val id = record["id"].toString()
+ val duration = record["duration"] as Long
+ val cores = record["cores"] as Int
+ val cpuUsage = record["cpuUsage"] as Double
+
+ val fragment = SimTraceWorkload.Fragment(
+ duration,
+ cpuUsage,
+ cores
+ )
+
+ queue.put(id to fragment)
+ }
+ } catch (e: InterruptedException) {
+ // Do not rethrow this
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Fill the buffers with the VMs
+ */
+ private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) {
+ if (!hasNext) {
+ return
+ }
+
+ val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>()
+ queue.drainTo(fragments)
+
+ for ((id, fragment) in fragments) {
+ if (id == poison.first) {
+ hasNext = false
+ return
+ }
+ buffers[id]?.forEach { it.add(fragment) }
+ }
+ }
+
+ /**
+ * A flag to indicate whether the reader has more entries.
+ */
+ private var hasNext: Boolean = true
+
+ /**
+ * Initialize the reader.
+ */
+ init {
+ val takenIds = mutableSetOf<UUID>()
+ val entries = mutableMapOf<String, GenericData.Record>()
+ val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
+
+ @Suppress("DEPRECATION")
+ val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
+ .disableCompatibility()
+ .run { if (filter != null) withFilter(filter) else this }
+ .build()
+
+ while (true) {
+ val record = metaReader.read() ?: break
+ val id = record["id"].toString()
+ entries[id] = record
+ }
+
+ metaReader.close()
+
+ val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms
+
+ // Create the entry iterator
+ iterator = selection.asSequence()
+ .mapNotNull { entries[it] }
+ .mapIndexed { index, record ->
+ val id = record["id"].toString()
+ val submissionTime = record["submissionTime"] as Long
+ val endTime = record["endTime"] as Long
+ val maxCores = record["maxCores"] as Int
+ val requiredMemory = record["requiredMemory"] as Long
+ val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray())
+
+ assert(uid !in takenIds)
+ takenIds += uid
+
+ logger.info("Processing VM $id")
+
+ val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
+ val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
+ buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
+ val fragments = sequence {
+ var time = submissionTime
+ repeat@ while (true) {
+ if (externalBuffer.isEmpty()) {
+ if (hasNext) {
+ pull(buffers)
+ continue
+ } else {
+ break
+ }
+ }
+
+ internalBuffer.addAll(externalBuffer)
+ externalBuffer.clear()
+
+ for (fragment in internalBuffer) {
+ yield(fragment)
+
+ time += fragment.duration
+ if (time >= endTime) {
+ break@repeat
+ }
+ }
+
+ internalBuffer.clear()
+ }
+
+ buffers.remove(id)
+ }
+ val relevantPerformanceInterferenceModelItems =
+ if (performanceInterferenceModel != null)
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(),
+ Random(random.nextInt())
+ )
+ else
+ null
+ val workload = SimTraceWorkload(fragments)
+ val meta = mapOf(
+ "cores" to maxCores,
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
+
+ TraceEntry(
+ uid, id, submissionTime, workload,
+ if (performanceInterferenceModel != null)
+ meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems as Any)
+ else
+ meta
+ )
+ }
+ .sortedBy { it.start }
+ .toList()
+ .iterator()
+ }
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
+
+ override fun close() {
+ readerThread.interrupt()
+ }
+
+ private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
+ override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
+
+ override fun canDrop(statistics: Statistics<Binary>): Boolean {
+ val min = statistics.min
+ val max = statistics.max
+
+ return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty()
+ }
+
+ override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean {
+ val min = statistics.min
+ val max = statistics.max
+
+ return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
+ }
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
new file mode 100644
index 00000000..7713c06f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
@@ -0,0 +1,621 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.arguments.argument
+import com.github.ajalt.clikt.parameters.groups.OptionGroup
+import com.github.ajalt.clikt.parameters.groups.groupChoice
+import com.github.ajalt.clikt.parameters.options.convert
+import com.github.ajalt.clikt.parameters.options.default
+import com.github.ajalt.clikt.parameters.options.defaultLazy
+import com.github.ajalt.clikt.parameters.options.option
+import com.github.ajalt.clikt.parameters.options.required
+import com.github.ajalt.clikt.parameters.options.split
+import com.github.ajalt.clikt.parameters.types.file
+import com.github.ajalt.clikt.parameters.types.long
+import me.tongfei.progressbar.ProgressBar
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.format.trace.sc20.Sc20VmPlacementReader
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import java.util.Random
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * Represents the command for converting traces
+ */
+public class TraceConverterCli : CliktCommand(name = "trace-converter") {
+ /**
+ * The directory where the trace should be stored.
+ */
+ private val outputPath by option("-O", "--output", help = "path to store the trace")
+ .file(canBeFile = false, mustExist = false)
+ .defaultLazy { File("output") }
+
+ /**
+ * The directory where the input trace is located.
+ */
+ private val inputPath by argument("input", help = "path to the input trace")
+ .file(canBeFile = false)
+
+ /**
+ * The input type of the trace.
+ */
+ private val type by option("-t", "--type", help = "input type of trace").groupChoice(
+ "solvinity" to SolvinityConversion(),
+ "bitbrains" to BitbrainsConversion(),
+ "azure" to AzureConversion()
+ )
+
+ override fun run() {
+ val metaSchema = SchemaBuilder
+ .record("meta")
+ .namespace("org.opendc.format.sc20")
+ .fields()
+ .name("id").type().stringType().noDefault()
+ .name("submissionTime").type().longType().noDefault()
+ .name("endTime").type().longType().noDefault()
+ .name("maxCores").type().intType().noDefault()
+ .name("requiredMemory").type().longType().noDefault()
+ .endRecord()
+ val schema = SchemaBuilder
+ .record("trace")
+ .namespace("org.opendc.format.sc20")
+ .fields()
+ .name("id").type().stringType().noDefault()
+ .name("time").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("cores").type().intType().noDefault()
+ .name("cpuUsage").type().doubleType().noDefault()
+ .name("flops").type().longType().noDefault()
+ .endRecord()
+
+ val metaParquet = File(outputPath, "meta.parquet")
+ val traceParquet = File(outputPath, "trace.parquet")
+
+ if (metaParquet.exists()) {
+ metaParquet.delete()
+ }
+ if (traceParquet.exists()) {
+ traceParquet.delete()
+ }
+
+ val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(metaParquet.toURI()))
+ .withSchema(metaSchema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ val writer = AvroParquetWriter.builder<GenericData.Record>(Path(traceParquet.toURI()))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ try {
+ val type = type ?: throw IllegalArgumentException("Invalid trace conversion")
+ val allFragments = type.read(inputPath, metaSchema, metaWriter)
+ allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
+
+ for (fragment in allFragments) {
+ val record = GenericData.Record(schema)
+ record.put("id", fragment.id)
+ record.put("time", fragment.tick)
+ record.put("duration", fragment.duration)
+ record.put("cores", fragment.cores)
+ record.put("cpuUsage", fragment.usage)
+ record.put("flops", fragment.flops)
+
+ writer.write(record)
+ }
+ } finally {
+ writer.close()
+ metaWriter.close()
+ }
+ }
+}
+
+/**
+ * The supported trace conversions.
+ */
+public sealed class TraceConversion(name: String) : OptionGroup(name) {
+ /**
+ * Read the fragments of the trace.
+ */
+ public abstract fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment>
+}
+
+public class SolvinityConversion : TraceConversion("Solvinity") {
+ private val clusters by option()
+ .split(",")
+
+ private val vmPlacements by option("--vm-placements", help = "file containing the VM placements")
+ .file(canBeDir = false)
+ .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } }
+ .required()
+
+ override fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment> {
+ val clusters = clusters?.toSet() ?: emptySet()
+ val timestampCol = 0
+ val cpuUsageCol = 1
+ val coreCol = 12
+ val provisionedMemoryCol = 20
+ val traceInterval = 5 * 60 * 1000L
+
+ // Identify start time of the entire trace
+ var minTimestamp = Long.MAX_VALUE
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ .forEach file@{ vmFile ->
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val vmId = vmFile.name
+
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null || !clusters.contains(clusterName)) {
+ continue
+ }
+
+ val values = line.split("\t")
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+
+ if (timestamp < minTimestamp) {
+ minTimestamp = timestamp
+ }
+ return@file
+ }
+ }
+ }
+ }
+
+ println("Start of trace at $minTimestamp")
+
+ val allFragments = mutableListOf<Fragment>()
+
+ val begin = 15 * 24 * 60 * 60 * 1000L
+ val end = 45 * 24 * 60 * 60 * 1000L
+
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ .forEach { vmFile ->
+ println(vmFile)
+
+ var vmId = ""
+ var maxCores = -1
+ var requiredMemory = -1L
+ var cores: Int
+ var minTime = Long.MAX_VALUE
+
+ val flopsFragments = sequence {
+ var last: Fragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split("\t")
+
+ vmId = vmFile.name
+
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null || !clusters.contains(clusterName)) {
+ continue
+ }
+
+ val timestamp =
+ (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp
+ if (begin > timestamp || timestamp > end) {
+ continue
+ }
+
+ cores = values[coreCol].trim().toInt()
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
+ Fragment(
+ vmId,
+ oldFragment.tick,
+ oldFragment.flops + flops,
+ oldFragment.duration + traceInterval,
+ cpuUsage,
+ cores
+ )
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ cores
+ )
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
+ }
+ }
+ }
+ }
+
+ if (last != null) {
+ yield(last!!)
+ }
+ }
+
+ var maxTime = Long.MIN_VALUE
+ flopsFragments.filter { it.tick in begin until end }.forEach { fragment ->
+ allFragments.add(fragment)
+ maxTime = max(maxTime, fragment.tick)
+ }
+
+ if (minTime in begin until end) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", vmId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+ }
+
+ return allFragments
+ }
+}
+
+/**
+ * Conversion of the Bitbrains public trace.
+ */
+public class BitbrainsConversion : TraceConversion("Bitbrains") {
+ override fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment> {
+ val timestampCol = 0
+ val cpuUsageCol = 3
+ val coreCol = 1
+ val provisionedMemoryCol = 5
+ val traceInterval = 5 * 60 * 1000L
+
+ val allFragments = mutableListOf<Fragment>()
+
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ .forEach { vmFile ->
+ println(vmFile)
+
+ var vmId = ""
+ var maxCores = -1
+ var requiredMemory = -1L
+ var cores: Int
+ var minTime = Long.MAX_VALUE
+
+ val flopsFragments = sequence {
+ var last: Fragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .drop(1)
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(";\t")
+
+ vmId = vmFile.name
+
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+
+ cores = values[coreCol].trim().toInt()
+ val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB
+ requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong())
+ maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
+ Fragment(
+ vmId,
+ oldFragment.tick,
+ oldFragment.flops + flops,
+ oldFragment.duration + traceInterval,
+ cpuUsage,
+ cores
+ )
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ cores
+ )
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
+ }
+ }
+ }
+ }
+
+ if (last != null) {
+ yield(last!!)
+ }
+ }
+
+ var maxTime = Long.MIN_VALUE
+ flopsFragments.forEach { fragment ->
+ allFragments.add(fragment)
+ maxTime = max(maxTime, fragment.tick)
+ }
+
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", vmId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+
+ return allFragments
+ }
+}
+
+/**
+ * Conversion of the Azure public VM trace.
+ */
+public class AzureConversion : TraceConversion("Azure") {
+ private val seed by option(help = "seed for trace sampling")
+ .long()
+ .default(0)
+
+ override fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment> {
+ val random = Random(seed)
+ val fraction = 0.01
+
+ // Read VM table
+ val vmIdTableCol = 0
+ val coreTableCol = 9
+ val provisionedMemoryTableCol = 10
+
+ var vmId: String
+ var cores: Int
+ var requiredMemory: Long
+
+ val vmIds = mutableSetOf<String>()
+ val vmIdToMetadata = mutableMapOf<String, VmInfo>()
+
+ BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader ->
+ reader.lineSequence()
+ .chunked(1024)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+ // Sample only a fraction of the VMs
+ if (random.nextDouble() > fraction) {
+ continue
+ }
+
+ val values = line.split(",")
+
+ // Exclude VMs with a large number of cores (not specified exactly)
+ if (values[coreTableCol].contains(">")) {
+ continue
+ }
+
+ vmId = values[vmIdTableCol].trim()
+ cores = values[coreTableCol].trim().toInt()
+ requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB
+
+ vmIds.add(vmId)
+ vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L)
+ }
+ }
+ }
+
+ // Read VM metric reading files
+ val timestampCol = 0
+ val vmIdCol = 1
+ val cpuUsageCol = 4
+ val traceInterval = 5 * 60 * 1000L
+
+ val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>()
+ val vmIdToLastFragment = mutableMapOf<String, Fragment?>()
+ val allFragments = mutableListOf<Fragment>()
+
+ for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) {
+ val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv")
+ var timestamp: Long
+ var cpuUsage: Double
+
+ BufferedReader(FileReader(readingsFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(",")
+ vmId = values[vmIdCol].trim()
+
+ // Ignore readings for VMs not in the sample
+ if (!vmIds.contains(vmId)) {
+ continue
+ }
+
+ timestamp = values[timestampCol].trim().toLong() * 1000L
+ vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp)
+ cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz
+ vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+ val lastFragment = vmIdToLastFragment[vmId]
+
+ vmIdToLastFragment[vmId] =
+ if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) {
+ Fragment(
+ vmId,
+ lastFragment.tick,
+ lastFragment.flops + flops,
+ lastFragment.duration + traceInterval,
+ cpuUsage,
+ vmIdToMetadata[vmId]!!.cores
+ )
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ vmIdToMetadata[vmId]!!.cores
+ )
+ if (lastFragment != null) {
+ if (vmIdToFragments[vmId] == null) {
+ vmIdToFragments[vmId] = mutableListOf()
+ }
+ vmIdToFragments[vmId]!!.add(lastFragment)
+ allFragments.add(lastFragment)
+ }
+ fragment
+ }
+ }
+ }
+ }
+ }
+
+ for (entry in vmIdToLastFragment) {
+ if (entry.value != null) {
+ if (vmIdToFragments[entry.key] == null) {
+ vmIdToFragments[entry.key] = mutableListOf()
+ }
+ vmIdToFragments[entry.key]!!.add(entry.value!!)
+ }
+ }
+
+ println("Read ${vmIdToLastFragment.size} VMs")
+
+ for (entry in vmIdToMetadata) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", entry.key)
+ metaRecord.put("submissionTime", entry.value.minTime)
+ metaRecord.put("endTime", entry.value.maxTime)
+ println("${entry.value.minTime} - ${entry.value.maxTime}")
+ metaRecord.put("maxCores", entry.value.cores)
+ metaRecord.put("requiredMemory", entry.value.requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+
+ return allFragments
+ }
+}
+
+public data class Fragment(
+ public val id: String,
+ public val tick: Long,
+ public val flops: Long,
+ public val duration: Long,
+ public val usage: Double,
+ public val cores: Int
+)
+
+public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long)
+
+/**
+ * A script to convert a trace in text format into a Parquet trace.
+ */
+public fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
new file mode 100644
index 00000000..5c8727ea
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
@@ -0,0 +1,199 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import mu.KotlinLogging
+import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.experiments.capelin.model.SamplingStrategy
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.format.trace.TraceEntry
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.util.*
+import kotlin.random.Random
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * Sample the workload for the specified [run].
+ */
+public fun sampleWorkload(
+ trace: List<TraceEntry<SimWorkload>>,
+ workload: Workload,
+ subWorkload: Workload,
+ seed: Int
+): List<TraceEntry<SimWorkload>> {
+ return when {
+ workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed)
+ workload.samplingStrategy == SamplingStrategy.HPC ->
+ sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false)
+ workload.samplingStrategy == SamplingStrategy.HPC_LOAD ->
+ sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true)
+ else ->
+ sampleRegularWorkload(trace, workload, workload, seed)
+ }
+}
+
+/**
+ * Sample a regular (non-HPC) workload.
+ */
+public fun sampleRegularWorkload(
+ trace: List<TraceEntry<SimWorkload>>,
+ workload: Workload,
+ subWorkload: Workload,
+ seed: Int
+): List<TraceEntry<SimWorkload>> {
+ val fraction = subWorkload.fraction
+
+ val shuffled = trace.shuffled(Random(seed))
+ val res = mutableListOf<TraceEntry<SimWorkload>>()
+ val totalLoad = if (workload is CompositeWorkload) {
+ workload.totalLoad
+ } else {
+ shuffled.sumByDouble { it.meta.getValue("total-load") as Double }
+ }
+ var currentLoad = 0.0
+
+ for (entry in shuffled) {
+ val entryLoad = entry.meta.getValue("total-load") as Double
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+}
+
+/**
+ * Sample a HPC workload.
+ */
+public fun sampleHpcWorkload(
+ trace: List<TraceEntry<SimWorkload>>,
+ workload: Workload,
+ seed: Int,
+ sampleOnLoad: Boolean
+): List<TraceEntry<SimWorkload>> {
+ val pattern = Regex("^vm__workload__(ComputeNode|cn).*")
+ val random = Random(seed)
+
+ val fraction = workload.fraction
+ val (hpc, nonHpc) = trace.partition { entry ->
+ val name = entry.name
+ name.matches(pattern)
+ }
+
+ val hpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<TraceEntry<SimWorkload>>()
+ hpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ val nonHpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<TraceEntry<SimWorkload>>()
+ nonHpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
+
+ val totalLoad = if (workload is CompositeWorkload) {
+ workload.totalLoad
+ } else {
+ trace.sumByDouble { it.meta.getValue("total-load") as Double }
+ }
+
+ logger.debug { "Total trace load: $totalLoad" }
+ var hpcCount = 0
+ var hpcLoad = 0.0
+ var nonHpcCount = 0
+ var nonHpcLoad = 0.0
+
+ val res = mutableListOf<TraceEntry<SimWorkload>>()
+
+ if (sampleOnLoad) {
+ var currentLoad = 0.0
+ for (entry in hpcSequence) {
+ val entryLoad = entry.meta.getValue("total-load") as Double
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ hpcLoad += entryLoad
+ hpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ for (entry in nonHpcSequence) {
+ val entryLoad = entry.meta.getValue("total-load") as Double
+ if ((currentLoad + entryLoad) / totalLoad > 1) {
+ break
+ }
+
+ nonHpcLoad += entryLoad
+ nonHpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+ } else {
+ hpcSequence
+ .take((fraction * trace.size).toInt())
+ .forEach { entry ->
+ hpcLoad += entry.meta.getValue("total-load") as Double
+ hpcCount += 1
+ res.add(entry)
+ }
+
+ nonHpcSequence
+ .take(((1 - fraction) * trace.size).toInt())
+ .forEach { entry ->
+ nonHpcLoad += entry.meta.getValue("total-load") as Double
+ nonHpcCount += 1
+ res.add(entry)
+ }
+ }
+
+ logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
+ logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
+ logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+}
+
+/**
+ * Sample a random trace entry.
+ */
+private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> {
+ val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
+ return entry.copy(uid = uid)
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
new file mode 100644
index 00000000..d1c01b8e
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ MIT License
+ ~
+ ~ Copyright (c) 2020 atlarge-research
+ ~
+ ~ Permission is hereby granted, free of charge, to any person obtaining a copy
+ ~ of this software and associated documentation files (the "Software"), to deal
+ ~ in the Software without restriction, including without limitation the rights
+ ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ ~ copies of the Software, and to permit persons to whom the Software is
+ ~ furnished to do so, subject to the following conditions:
+ ~
+ ~ The above copyright notice and this permission notice shall be included in all
+ ~ copies or substantial portions of the Software.
+ ~
+ ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ ~ SOFTWARE.
+ -->
+
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="org.opendc" level="debug" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.opendc.experiments.capelin" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.opendc.experiments.capelin.trace" level="debug" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.apache.hadoop" level="warn" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Root level="error">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
new file mode 100644
index 00000000..4cb50ab9
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -0,0 +1,216 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.channels.Channel
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.capelin.monitor.ExperimentMonitor
+import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
+import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
+import org.opendc.format.environment.EnvironmentReader
+import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.sdk.toOtelClock
+import java.io.File
+
+/**
+ * An integration test suite for the SC20 experiments.
+ */
+class CapelinIntegrationTest {
+ /**
+ * The monitor used to keep track of the metrics.
+ */
+ private lateinit var monitor: TestExperimentReporter
+
+ /**
+ * Setup the experimental environment.
+ */
+ @BeforeEach
+ fun setUp() {
+ monitor = TestExperimentReporter()
+ }
+
+ @Test
+ fun testLarge() = runBlockingSimulation {
+ val failures = false
+ val seed = 0
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
+ val traceReader = createTestTraceReader()
+ val environmentReader = createTestEnvironmentReader()
+ lateinit var monitorResults: ComputeMetrics
+
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
+ val failureDomain = if (failures) {
+ println("ENABLING failures")
+ createFailureDomain(
+ this,
+ clock,
+ seed,
+ 24.0 * 7,
+ scheduler,
+ chan
+ )
+ } else {
+ null
+ }
+
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
+
+ failureDomain?.cancel()
+ }
+
+ monitorResults = collectMetrics(meterProvider as MetricProducer)
+ println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}")
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") },
+ { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
+ { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
+ { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
+ { assertEquals(207389912923, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
+ { assertEquals(207122087280, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
+ { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
+ )
+ }
+
+ @Test
+ fun testSmall() = runBlockingSimulation {
+ val seed = 1
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
+ val traceReader = createTestTraceReader(0.5, seed)
+ val environmentReader = createTestEnvironmentReader("single")
+
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
+ }
+
+ val metrics = collectMetrics(meterProvider as MetricProducer)
+ println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(96350072517, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(96330335057, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
+ )
+ }
+
+ /**
+ * Obtain the trace reader for the test.
+ */
+ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
+ return Sc20ParquetTraceReader(
+ listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))),
+ emptyMap(),
+ Workload("test", fraction),
+ seed
+ )
+ }
+
+ /**
+ * Obtain the environment reader for the test.
+ */
+ private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader {
+ val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt")
+ return Sc20ClusterEnvironmentReader(stream)
+ }
+
+ class TestExperimentReporter : ExperimentMonitor {
+ var totalRequestedBurst = 0L
+ var totalGrantedBurst = 0L
+ var totalOvercommissionedBurst = 0L
+ var totalInterferedBurst = 0L
+
+ override fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ powerDraw: Double,
+ numberOfDeployedImages: Int,
+ host: Host,
+ ) {
+ totalRequestedBurst += requestedBurst
+ totalGrantedBurst += grantedBurst
+ totalOvercommissionedBurst += overcommissionedBurst
+ totalInterferedBurst += interferedBurst
+ }
+
+ override fun close() {}
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
new file mode 100644
index 00000000..53b3c2d7
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
@@ -0,0 +1,3 @@
+ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
+A01;A01;8;3.2;64;1;64;8
+
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt
new file mode 100644
index 00000000..6b347bff
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt
@@ -0,0 +1,5 @@
+ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
+A01;A01;32;3.2;2048;1;256;32
+B01;B01;48;2.93;1256;6;64;8
+C01;C01;32;3.2;2048;2;128;16
+
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
new file mode 100644
index 00000000..ce7a812c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
new file mode 100644
index 00000000..1d7ce882
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
Binary files differ