summaryrefslogtreecommitdiff
path: root/simulator/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-02-23 21:30:51 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-02-23 21:30:51 +0100
commit481706adc89d502840c967d94a165c55d8ac0e1a (patch)
tree6816ed3af1ebfb7daf8baf9f33114df6f52a749c /simulator/opendc-experiments/opendc-experiments-capelin/src
parente99ceb1575f50d98ad61f5101b7b69ab69453d70 (diff)
exp: Add support for running Capelin experiments in IntelliJ
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt79
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt276
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt60
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt59
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt56
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt61
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt221
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt50
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt47
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt31
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt28
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt44
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt75
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt202
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt35
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt43
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt39
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt34
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt41
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt126
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt81
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt65
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt72
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt94
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt170
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt305
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt621
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt210
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml49
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt256
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt3
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt5
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquetbin0 -> 2148 bytes
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquetbin0 -> 1672463 bytes
34 files changed, 3538 insertions, 0 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
new file mode 100644
index 00000000..faabe5cb
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.harness.dsl.anyOf
+
+/**
+ * A [Portfolio] that explores the effect of a composite workload.
+ */
+public class CompositeWorkloadPortfolio : Portfolio("composite-workload") {
+ private val totalSampleLoad = 1.3301733005049648E12
+
+ override val topology: Topology by anyOf(
+ Topology("base"),
+ Topology("exp-vol-hor-hom"),
+ Topology("exp-vol-ver-hom"),
+ Topology("exp-vel-ver-hom")
+ )
+
+ override val workload: Workload by anyOf(
+ CompositeWorkload(
+ "all-azure",
+ listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "solvinity-25-azure-75",
+ listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "solvinity-50-azure-50",
+ listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "solvinity-75-azure-25",
+ listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "all-solvinity",
+ listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)),
+ totalSampleLoad
+ )
+ )
+
+ override val operationalPhenomena: OperationalPhenomena by anyOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false)
+ )
+
+ override val allocationPolicy: String by anyOf(
+ "active-servers"
+ )
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
new file mode 100644
index 00000000..8f3e686a
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -0,0 +1,276 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.experiment
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
+import kotlinx.coroutines.flow.takeWhile
+import kotlinx.coroutines.launch
+import mu.KotlinLogging
+import org.opendc.compute.core.Flavor
+import org.opendc.compute.core.ServerEvent
+import org.opendc.compute.core.metal.NODE_CLUSTER
+import org.opendc.compute.core.metal.driver.BareMetalDriver
+import org.opendc.compute.core.metal.service.ProvisioningService
+import org.opendc.compute.core.virt.HypervisorEvent
+import org.opendc.compute.core.virt.service.VirtProvisioningEvent
+import org.opendc.compute.core.workload.VmWorkload
+import org.opendc.compute.simulator.SimBareMetalDriver
+import org.opendc.compute.simulator.SimVirtDriver
+import org.opendc.compute.simulator.SimVirtProvisioningService
+import org.opendc.compute.simulator.allocation.AllocationPolicy
+import org.opendc.experiments.capelin.monitor.ExperimentMonitor
+import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
+import org.opendc.format.environment.EnvironmentReader
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.failures.CorrelatedFaultInjector
+import org.opendc.simulator.failures.FailureDomain
+import org.opendc.simulator.failures.FaultInjector
+import org.opendc.trace.core.EventTracer
+import java.io.File
+import java.time.Clock
+import kotlin.math.ln
+import kotlin.math.max
+import kotlin.random.Random
+
+/**
+ * The logger for this experiment.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * Construct the failure domain for the experiments.
+ */
+public suspend fun createFailureDomain(
+ coroutineScope: CoroutineScope,
+ clock: Clock,
+ seed: Int,
+ failureInterval: Double,
+ bareMetalProvisioner: ProvisioningService,
+ chan: Channel<Unit>
+): CoroutineScope {
+ val job = coroutineScope.launch {
+ chan.receive()
+ val random = Random(seed)
+ val injectors = mutableMapOf<String, FaultInjector>()
+ for (node in bareMetalProvisioner.nodes()) {
+ val cluster = node.metadata[NODE_CLUSTER] as String
+ val injector =
+ injectors.getOrPut(cluster) {
+ createFaultInjector(
+ this,
+ clock,
+ random,
+ failureInterval
+ )
+ }
+ injector.enqueue(node.metadata["driver"] as FailureDomain)
+ }
+ }
+ return CoroutineScope(coroutineScope.coroutineContext + job)
+}
+
+/**
+ * Obtain the [FaultInjector] to use for the experiments.
+ */
+public fun createFaultInjector(
+ coroutineScope: CoroutineScope,
+ clock: Clock,
+ random: Random,
+ failureInterval: Double
+): FaultInjector {
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return CorrelatedFaultInjector(
+ coroutineScope,
+ clock,
+ iatScale = ln(failureInterval), iatShape = 1.03, // Hours
+ sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
+ dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
+ random = random
+ )
+}
+
+/**
+ * Create the trace reader from which the VM workloads are read.
+ */
+public fun createTraceReader(
+ path: File,
+ performanceInterferenceModel: PerformanceInterferenceModel,
+ vms: List<String>,
+ seed: Int
+): Sc20StreamingParquetTraceReader {
+ return Sc20StreamingParquetTraceReader(
+ path,
+ performanceInterferenceModel,
+ vms,
+ Random(seed)
+ )
+}
+
+/**
+ * Construct the environment for a VM provisioner and return the provisioner instance.
+ */
+public suspend fun createProvisioner(
+ coroutineScope: CoroutineScope,
+ clock: Clock,
+ environmentReader: EnvironmentReader,
+ allocationPolicy: AllocationPolicy,
+ eventTracer: EventTracer
+): Pair<ProvisioningService, SimVirtProvisioningService> {
+ val environment = environmentReader.use { it.construct(coroutineScope, clock) }
+ val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
+
+ // Wait for the bare metal nodes to be spawned
+ delay(10)
+
+ val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer, SimFairShareHypervisorProvider())
+
+ // Wait for the hypervisors to be spawned
+ delay(10)
+
+ return bareMetalProvisioner to scheduler
+}
+
+/**
+ * Attach the specified monitor to the VM provisioner.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public suspend fun attachMonitor(
+ coroutineScope: CoroutineScope,
+ clock: Clock,
+ scheduler: SimVirtProvisioningService,
+ monitor: ExperimentMonitor
+) {
+
+ val hypervisors = scheduler.drivers()
+
+ // Monitor hypervisor events
+ for (hypervisor in hypervisors) {
+ // TODO Do not expose VirtDriver directly but use Hypervisor class.
+ val server = (hypervisor as SimVirtDriver).server
+ monitor.reportHostStateChange(clock.millis(), hypervisor, server)
+ server.events
+ .onEach { event ->
+ val time = clock.millis()
+ when (event) {
+ is ServerEvent.StateChanged -> {
+ monitor.reportHostStateChange(time, hypervisor, event.server)
+ }
+ }
+ }
+ .launchIn(coroutineScope)
+ hypervisor.events
+ .onEach { event ->
+ when (event) {
+ is HypervisorEvent.SliceFinished -> monitor.reportHostSlice(
+ clock.millis(),
+ event.requestedBurst,
+ event.grantedBurst,
+ event.overcommissionedBurst,
+ event.interferedBurst,
+ event.cpuUsage,
+ event.cpuDemand,
+ event.numberOfDeployedImages,
+ event.hostServer
+ )
+ }
+ }
+ .launchIn(coroutineScope)
+
+ val driver = hypervisor.server.services[BareMetalDriver.Key] as SimBareMetalDriver
+ driver.powerDraw
+ .onEach { monitor.reportPowerConsumption(hypervisor.server, it) }
+ .launchIn(coroutineScope)
+ }
+
+ scheduler.events
+ .onEach { event ->
+ when (event) {
+ is VirtProvisioningEvent.MetricsAvailable ->
+ monitor.reportProvisionerMetrics(clock.millis(), event)
+ }
+ }
+ .launchIn(coroutineScope)
+}
+
+/**
+ * Process the trace.
+ */
+public suspend fun processTrace(
+ coroutineScope: CoroutineScope,
+ clock: Clock,
+ reader: TraceReader<VmWorkload>,
+ scheduler: SimVirtProvisioningService,
+ chan: Channel<Unit>,
+ monitor: ExperimentMonitor
+) {
+ try {
+ var submitted = 0
+
+ while (reader.hasNext()) {
+ val (time, workload) = reader.next()
+
+ submitted++
+ delay(max(0, time - clock.millis()))
+ coroutineScope.launch {
+ chan.send(Unit)
+ val server = scheduler.deploy(
+ workload.image.name,
+ workload.image,
+ Flavor(
+ workload.image.tags["cores"] as Int,
+ workload.image.tags["required-memory"] as Long
+ )
+ )
+ // Monitor server events
+ server.events
+ .onEach {
+ if (it is ServerEvent.StateChanged) {
+ monitor.reportVmStateChange(clock.millis(), it.server)
+ }
+ }
+ .collect()
+ }
+ }
+
+ scheduler.events
+ .takeWhile {
+ when (it) {
+ is VirtProvisioningEvent.MetricsAvailable ->
+ it.inactiveVmCount + it.failedVmCount != submitted
+ }
+ }
+ .collect()
+ delay(1)
+ } finally {
+ reader.close()
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
new file mode 100644
index 00000000..e1cf8517
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.harness.dsl.anyOf
+
+/**
+ * A [Portfolio] that explores the difference between horizontal and vertical scaling.
+ */
+public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") {
+ override val topology: Topology by anyOf(
+ Topology("base"),
+ Topology("rep-vol-hor-hom"),
+ Topology("rep-vol-hor-het"),
+ Topology("rep-vol-ver-hom"),
+ Topology("rep-vol-ver-het"),
+ Topology("exp-vol-hor-hom"),
+ Topology("exp-vol-hor-het"),
+ Topology("exp-vol-ver-hom"),
+ Topology("exp-vol-ver-het")
+ )
+
+ override val workload: Workload by anyOf(
+ Workload("solvinity", 0.1),
+ Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomena: OperationalPhenomena by anyOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicy: String by anyOf(
+ "active-servers"
+ )
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
new file mode 100644
index 00000000..a995e467
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.SamplingStrategy
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.harness.dsl.anyOf
+
+/**
+ * A [Portfolio] to explore the effect of HPC workloads.
+ */
+public class MoreHpcPortfolio : Portfolio("more_hpc") {
+ override val topology: Topology by anyOf(
+ Topology("base"),
+ Topology("exp-vol-hor-hom"),
+ Topology("exp-vol-ver-hom"),
+ Topology("exp-vel-ver-hom")
+ )
+
+ override val workload: Workload by anyOf(
+ Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC),
+ Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC),
+ Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC),
+ Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC),
+ Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD),
+ Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD),
+ Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD)
+ )
+
+ override val operationalPhenomena: OperationalPhenomena by anyOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicy: String by anyOf(
+ "active-servers"
+ )
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
new file mode 100644
index 00000000..49559e0e
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.harness.dsl.anyOf
+
+/**
+ * A [Portfolio] that explores the effect of adding more velocity to a cluster (e.g., faster machines).
+ */
+public class MoreVelocityPortfolio : Portfolio("more_velocity") {
+ override val topology: Topology by anyOf(
+ Topology("base"),
+ Topology("rep-vel-ver-hom"),
+ Topology("rep-vel-ver-het"),
+ Topology("exp-vel-ver-hom"),
+ Topology("exp-vel-ver-het")
+ )
+
+ override val workload: Workload by anyOf(
+ Workload("solvinity", 0.1),
+ Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomena: OperationalPhenomena by anyOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicy: String by anyOf(
+ "active-servers"
+ )
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
new file mode 100644
index 00000000..1aac4f9e
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.harness.dsl.anyOf
+
+/**
+ * A [Portfolio] that explores the effect of operational phenomena on metrics.
+ */
+public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena") {
+ override val topology: Topology by anyOf(
+ Topology("base")
+ )
+
+ override val workload: Workload by anyOf(
+ Workload("solvinity", 0.1),
+ Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomena: OperationalPhenomena by anyOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true),
+ OperationalPhenomena(failureFrequency = 0.0, hasInterference = true),
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false),
+ OperationalPhenomena(failureFrequency = 0.0, hasInterference = false)
+ )
+
+ override val allocationPolicy: String by anyOf(
+ "mem",
+ "mem-inv",
+ "core-mem",
+ "core-mem-inv",
+ "active-servers",
+ "active-servers-inv",
+ "random"
+ )
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
new file mode 100644
index 00000000..75b0d735
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -0,0 +1,221 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.TestCoroutineScope
+import mu.KotlinLogging
+import org.opendc.compute.simulator.allocation.*
+import org.opendc.experiments.capelin.experiment.attachMonitor
+import org.opendc.experiments.capelin.experiment.createFailureDomain
+import org.opendc.experiments.capelin.experiment.createProvisioner
+import org.opendc.experiments.capelin.experiment.processTrace
+import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
+import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
+import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
+import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import org.opendc.format.trace.PerformanceInterferenceModelReader
+import org.opendc.harness.dsl.Experiment
+import org.opendc.harness.dsl.anyOf
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.trace.core.EventTracer
+import java.io.File
+import java.util.concurrent.ConcurrentHashMap
+import kotlin.random.Random
+
+/**
+ * A portfolio represents a collection of scenarios are tested for the work.
+ *
+ * @param name The name of the portfolio.
+ */
+public abstract class Portfolio(name: String) : Experiment(name) {
+ /**
+ * The logger for this portfolio instance.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The path to where the environments are located.
+ */
+ private val environmentPath by anyOf(File("input/environments/"))
+
+ /**
+ * The path to where the traces are located.
+ */
+ private val tracePath by anyOf(File("input/traces/"))
+
+ /**
+ * The path to where the output results should be written.
+ */
+ private val outputPath by anyOf(File("output/"))
+
+ /**
+ * The path to the original VM placements file.
+ */
+ private val vmPlacements by anyOf(emptyMap<String, String>())
+
+ /**
+ * The path to the performance interference model.
+ */
+ private val performanceInterferenceModel by anyOf<PerformanceInterferenceModelReader?>(null)
+
+ /**
+ * The topology to test.
+ */
+ public abstract val topology: Topology
+
+ /**
+ * The workload to test.
+ */
+ public abstract val workload: Workload
+
+ /**
+ * The operational phenomenas to consider.
+ */
+ public abstract val operationalPhenomena: OperationalPhenomena
+
+ /**
+ * The allocation policies to consider.
+ */
+ public abstract val allocationPolicy: String
+
+ /**
+ * A map of trace readers.
+ */
+ private val traceReaders = ConcurrentHashMap<String, Sc20RawParquetTraceReader>()
+
+ /**
+ * Perform a single trial for this portfolio.
+ */
+ @OptIn(ExperimentalCoroutinesApi::class)
+ override fun doRun(repeat: Int) {
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
+ val tracer = EventTracer(clock)
+ val seeder = Random(repeat)
+ val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt"))
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = createAllocationPolicy(seeder)
+
+ val workload = workload
+ val workloadNames = if (workload is CompositeWorkload) {
+ workload.workloads.map { it.name }
+ } else {
+ listOf(workload.name)
+ }
+
+ val rawReaders = workloadNames.map { workloadName ->
+ traceReaders.computeIfAbsent(workloadName) {
+ logger.info { "Loading trace $workloadName" }
+ Sc20RawParquetTraceReader(File(tracePath, workloadName))
+ }
+ }
+
+ val performanceInterferenceModel = performanceInterferenceModel
+ ?.takeIf { operationalPhenomena.hasInterference }
+ ?.construct(seeder) ?: emptyMap()
+ val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt())
+
+ val monitor = ParquetExperimentMonitor(
+ outputPath,
+ "portfolio_id=$name/scenario_id=$id/run_id=$repeat",
+ 4096
+ )
+
+ testScope.launch {
+ val (bareMetalProvisioner, scheduler) = createProvisioner(
+ this,
+ clock,
+ environment,
+ allocationPolicy,
+ tracer
+ )
+
+ val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
+ logger.debug("ENABLING failures")
+ createFailureDomain(
+ this,
+ clock,
+ seeder.nextInt(),
+ operationalPhenomena.failureFrequency,
+ bareMetalProvisioner,
+ chan
+ )
+ } else {
+ null
+ }
+
+ attachMonitor(this, clock, scheduler, monitor)
+ processTrace(
+ this,
+ clock,
+ trace,
+ scheduler,
+ chan,
+ monitor
+ )
+
+ logger.debug("SUBMIT=${scheduler.submittedVms}")
+ logger.debug("FAIL=${scheduler.unscheduledVms}")
+ logger.debug("QUEUED=${scheduler.queuedVms}")
+ logger.debug("RUNNING=${scheduler.runningVms}")
+ logger.debug("FINISHED=${scheduler.finishedVms}")
+
+ failureDomain?.cancel()
+ scheduler.terminate()
+ }
+
+ try {
+ testScope.advanceUntilIdle()
+ } finally {
+ monitor.close()
+ }
+ }
+
+ /**
+ * Create the [AllocationPolicy] instance to use for the trial.
+ */
+ private fun createAllocationPolicy(seeder: Random): AllocationPolicy {
+ return when (allocationPolicy) {
+ "mem" -> AvailableMemoryAllocationPolicy()
+ "mem-inv" -> AvailableMemoryAllocationPolicy(true)
+ "core-mem" -> AvailableCoreMemoryAllocationPolicy()
+ "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
+ "active-servers" -> NumberOfActiveServersAllocationPolicy()
+ "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
+ "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
+ "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
+ "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
+ "replay" -> ReplayAllocationPolicy(vmPlacements)
+ else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
+ }
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
new file mode 100644
index 00000000..b6d3b30c
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.harness.dsl.anyOf
+
+/**
+ * A [Portfolio] that compares the original VM placements against our policies.
+ */
+public class ReplayPortfolio : Portfolio("replay") {
+ override val topology: Topology by anyOf(
+ Topology("base")
+ )
+
+ override val workload: Workload by anyOf(
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomena: OperationalPhenomena by anyOf(
+ OperationalPhenomena(failureFrequency = 0.0, hasInterference = false)
+ )
+
+ override val allocationPolicy: String by anyOf(
+ "replay",
+ "active-servers"
+ )
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
new file mode 100644
index 00000000..90840db8
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import org.opendc.experiments.capelin.model.OperationalPhenomena
+import org.opendc.experiments.capelin.model.Topology
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.harness.dsl.anyOf
+
+/**
+ * A [Portfolio] to perform a simple test run.
+ */
+public class TestPortfolio : Portfolio("test") {
+ override val topology: Topology by anyOf(
+ Topology("base")
+ )
+
+ override val workload: Workload by anyOf(
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomena: OperationalPhenomena by anyOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicy: String by anyOf("active-servers")
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt
new file mode 100644
index 00000000..b53b3617
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.model
+
+/**
+ * Operation phenomena during experiments.
+ *
+ * @param failureFrequency The average time between failures in hours.
+ * @param hasInterference A flag to enable performance interference between VMs.
+ */
+public data class OperationalPhenomena(val failureFrequency: Double, val hasInterference: Boolean)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt
new file mode 100644
index 00000000..fe16a294
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.model
+
+/**
+ * The topology topology on which we test the workload.
+ */
+public data class Topology(val name: String)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
new file mode 100644
index 00000000..c4ddd158
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.model
+
+public enum class SamplingStrategy {
+ REGULAR,
+ HPC,
+ HPC_LOAD
+}
+
+/**
+ * A workload that is considered for a scenario.
+ */
+public open class Workload(
+ public open val name: String,
+ public val fraction: Double,
+ public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR
+)
+
+/**
+ * A workload that is composed of multiple workloads.
+ */
+public class CompositeWorkload(override val name: String, public val workloads: List<Workload>, public val totalLoad: Double) :
+ Workload(name, -1.0)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
new file mode 100644
index 00000000..3c6637bf
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.monitor
+
+import org.opendc.compute.core.Server
+import org.opendc.compute.core.virt.driver.VirtDriver
+import org.opendc.compute.core.virt.service.VirtProvisioningEvent
+import java.io.Closeable
+
+/**
+ * A monitor watches the events of an experiment.
+ */
+public interface ExperimentMonitor : Closeable {
+ /**
+ * This method is invoked when the state of a VM changes.
+ */
+ public fun reportVmStateChange(time: Long, server: Server) {}
+
+ /**
+ * This method is invoked when the state of a host changes.
+ */
+ public fun reportHostStateChange(
+ time: Long,
+ driver: VirtDriver,
+ server: Server
+ ) {
+ }
+
+ /**
+ * Report the power consumption of a host.
+ */
+ public fun reportPowerConsumption(host: Server, draw: Double) {}
+
+ /**
+ * This method is invoked for a host for each slice that is finishes.
+ */
+ public fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ numberOfDeployedImages: Int,
+ hostServer: Server,
+ duration: Long = 5 * 60 * 1000L
+ ) {
+ }
+
+ /**
+ * This method is invoked for a provisioner event.
+ */
+ public fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {}
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
new file mode 100644
index 00000000..a0d57656
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -0,0 +1,202 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.monitor
+
+import mu.KotlinLogging
+import org.opendc.compute.core.Server
+import org.opendc.compute.core.virt.driver.VirtDriver
+import org.opendc.compute.core.virt.service.VirtProvisioningEvent
+import org.opendc.experiments.capelin.telemetry.HostEvent
+import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
+import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter
+import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter
+import java.io.File
+
+/**
+ * The logger instance to use.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * An [ExperimentMonitor] that logs the events to a Parquet file.
+ */
+public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor {
+ private val hostWriter = ParquetHostEventWriter(
+ File(base, "host-metrics/$partition/data.parquet"),
+ bufferSize
+ )
+ private val provisionerWriter = ParquetProvisionerEventWriter(
+ File(base, "provisioner-metrics/$partition/data.parquet"),
+ bufferSize
+ )
+ private val currentHostEvent = mutableMapOf<Server, HostEvent>()
+ private var startTime = -1L
+
+ override fun reportVmStateChange(time: Long, server: Server) {
+ if (startTime < 0) {
+ startTime = time
+
+ // Update timestamp of initial event
+ currentHostEvent.replaceAll { _, v -> v.copy(timestamp = startTime) }
+ }
+ }
+
+ override fun reportHostStateChange(
+ time: Long,
+ driver: VirtDriver,
+ server: Server
+ ) {
+ logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" }
+
+ val previousEvent = currentHostEvent[server]
+
+ val roundedTime = previousEvent?.let {
+ val duration = time - it.timestamp
+ val k = 5 * 60 * 1000L // 5 min in ms
+ val rem = duration % k
+
+ if (rem == 0L) {
+ time
+ } else {
+ it.timestamp + duration + k - rem
+ }
+ } ?: time
+
+ reportHostSlice(
+ roundedTime,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0.0,
+ 0.0,
+ 0,
+ server
+ )
+ }
+
+ private val lastPowerConsumption = mutableMapOf<Server, Double>()
+
+ override fun reportPowerConsumption(host: Server, draw: Double) {
+ lastPowerConsumption[host] = draw
+ }
+
+ override fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ numberOfDeployedImages: Int,
+ hostServer: Server,
+ duration: Long
+ ) {
+ val previousEvent = currentHostEvent[hostServer]
+ when {
+ previousEvent == null -> {
+ val event = HostEvent(
+ time,
+ 5 * 60 * 1000L,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0,
+ hostServer.flavor.cpuCount
+ )
+
+ currentHostEvent[hostServer] = event
+ }
+ previousEvent.timestamp == time -> {
+ val event = HostEvent(
+ time,
+ previousEvent.duration,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0,
+ hostServer.flavor.cpuCount
+ )
+
+ currentHostEvent[hostServer] = event
+ }
+ else -> {
+ hostWriter.write(previousEvent)
+
+ val event = HostEvent(
+ time,
+ time - previousEvent.timestamp,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0,
+ hostServer.flavor.cpuCount
+ )
+
+ currentHostEvent[hostServer] = event
+ }
+ }
+ }
+
+ override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {
+ provisionerWriter.write(
+ ProvisionerEvent(
+ time,
+ event.totalHostCount,
+ event.availableHostCount,
+ event.totalVmCount,
+ event.activeVmCount,
+ event.inactiveVmCount,
+ event.waitingVmCount,
+ event.failedVmCount
+ )
+ )
+ }
+
+ override fun close() {
+ // Flush remaining events
+ for ((_, event) in currentHostEvent) {
+ hostWriter.write(event)
+ }
+ currentHostEvent.clear()
+
+ hostWriter.close()
+ provisionerWriter.close()
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt
new file mode 100644
index 00000000..c29e116e
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt
@@ -0,0 +1,35 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry
+
+/**
+ * An event that occurs within the system.
+ */
+public abstract class Event(public val name: String) {
+ /**
+ * The time of occurrence of this event.
+ */
+ public abstract val timestamp: Long
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
new file mode 100644
index 00000000..e5e9d520
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry
+
+import org.opendc.compute.core.Server
+
+/**
+ * A periodic report of the host machine metrics.
+ */
+public data class HostEvent(
+ override val timestamp: Long,
+ public val duration: Long,
+ public val host: Server,
+ public val vmCount: Int,
+ public val requestedBurst: Long,
+ public val grantedBurst: Long,
+ public val overcommissionedBurst: Long,
+ public val interferedBurst: Long,
+ public val cpuUsage: Double,
+ public val cpuDemand: Double,
+ public val powerDraw: Double,
+ public val cores: Int
+) : Event("host-metrics")
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt
new file mode 100644
index 00000000..539c9bc9
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt
@@ -0,0 +1,39 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry
+
+/**
+ * A periodic report of the provisioner's metrics.
+ */
+public data class ProvisionerEvent(
+ override val timestamp: Long,
+ public val totalHostCount: Int,
+ public val availableHostCount: Int,
+ public val totalVmCount: Int,
+ public val activeVmCount: Int,
+ public val inactiveVmCount: Int,
+ public val waitingVmCount: Int,
+ public val failedVmCount: Int
+) : Event("provisioner-metrics")
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt
new file mode 100644
index 00000000..6c8fc941
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry
+
+import org.opendc.experiments.capelin.Portfolio
+
+/**
+ * A periodic report of the host machine metrics.
+ */
+public data class RunEvent(
+ val portfolio: Portfolio,
+ val repeat: Int,
+ override val timestamp: Long
+) : Event("run")
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
new file mode 100644
index 00000000..427c453a
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry
+
+import org.opendc.compute.core.Server
+
+/**
+ * A periodic report of a virtual machine's metrics.
+ */
+public data class VmEvent(
+ override val timestamp: Long,
+ public val duration: Long,
+ public val vm: Server,
+ public val host: Server,
+ public val requestedBurst: Long,
+ public val grantedBurst: Long,
+ public val overcommissionedBurst: Long,
+ public val interferedBurst: Long,
+ public val cpuUsage: Double,
+ public val cpuDemand: Double
+) : Event("vm-metrics")
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
new file mode 100644
index 00000000..38930ee5
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
@@ -0,0 +1,126 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry.parquet
+
+import mu.KotlinLogging
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.experiments.capelin.telemetry.Event
+import java.io.Closeable
+import java.io.File
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import kotlin.concurrent.thread
+
+/**
+ * The logging instance to use.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * A writer that writes events in Parquet format.
+ */
+public open class ParquetEventWriter<in T : Event>(
+ private val path: File,
+ private val schema: Schema,
+ private val converter: (T, GenericData.Record) -> Unit,
+ private val bufferSize: Int = 4096
+) : Runnable, Closeable {
+ /**
+ * The writer to write the Parquet file.
+ */
+ private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ /**
+ * The queue of commands to process.
+ */
+ private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize)
+
+ /**
+ * The thread that is responsible for writing the Parquet records.
+ */
+ private val writerThread = thread(start = false, name = "parquet-writer") { run() }
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ public fun write(event: T) {
+ queue.put(Action.Write(event))
+ }
+
+ /**
+ * Signal the writer to stop.
+ */
+ public override fun close() {
+ queue.put(Action.Stop)
+ writerThread.join()
+ }
+
+ init {
+ writerThread.start()
+ }
+
+ /**
+ * Start the writer thread.
+ */
+ override fun run() {
+ try {
+ loop@ while (true) {
+ val action = queue.take()
+ when (action) {
+ is Action.Stop -> break@loop
+ is Action.Write<*> -> {
+ val record = GenericData.Record(schema)
+ @Suppress("UNCHECKED_CAST")
+ converter(action.event as T, record)
+ writer.write(record)
+ }
+ }
+ }
+ } catch (e: Throwable) {
+ logger.error("Writer failed", e)
+ } finally {
+ writer.close()
+ }
+ }
+
+ public sealed class Action {
+ /**
+ * A poison pill that will stop the writer thread.
+ */
+ public object Stop : Action()
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ public data class Write<out T : Event>(val event: T) : Action()
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
new file mode 100644
index 00000000..4a3e7963
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.opendc.experiments.capelin.telemetry.HostEvent
+import java.io.File
+
+/**
+ * A Parquet event writer for [HostEvent]s.
+ */
+public class ParquetHostEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "host-writer"
+
+ public companion object {
+ private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record ->
+ // record.put("portfolio_id", event.run.parent.parent.id)
+ // record.put("scenario_id", event.run.parent.id)
+ // record.put("run_id", event.run.id)
+ record.put("host_id", event.host.name)
+ record.put("state", event.host.state.name)
+ record.put("timestamp", event.timestamp)
+ record.put("duration", event.duration)
+ record.put("vm_count", event.vmCount)
+ record.put("requested_burst", event.requestedBurst)
+ record.put("granted_burst", event.grantedBurst)
+ record.put("overcommissioned_burst", event.overcommissionedBurst)
+ record.put("interfered_burst", event.interferedBurst)
+ record.put("cpu_usage", event.cpuUsage)
+ record.put("cpu_demand", event.cpuDemand)
+ record.put("power_draw", event.powerDraw * (1.0 / 12))
+ record.put("cores", event.cores)
+ }
+
+ private val schema: Schema = SchemaBuilder
+ .record("host_metrics")
+ .namespace("org.opendc.experiments.sc20")
+ .fields()
+ // .name("portfolio_id").type().intType().noDefault()
+ // .name("scenario_id").type().intType().noDefault()
+ // .name("run_id").type().intType().noDefault()
+ .name("timestamp").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("host_id").type().stringType().noDefault()
+ .name("state").type().stringType().noDefault()
+ .name("vm_count").type().intType().noDefault()
+ .name("requested_burst").type().longType().noDefault()
+ .name("granted_burst").type().longType().noDefault()
+ .name("overcommissioned_burst").type().longType().noDefault()
+ .name("interfered_burst").type().longType().noDefault()
+ .name("cpu_usage").type().doubleType().noDefault()
+ .name("cpu_demand").type().doubleType().noDefault()
+ .name("power_draw").type().doubleType().noDefault()
+ .name("cores").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt
new file mode 100644
index 00000000..8feff8d9
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
+import java.io.File
+
+/**
+ * A Parquet event writer for [ProvisionerEvent]s.
+ */
+public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "provisioner-writer"
+
+ public companion object {
+ private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record ->
+ record.put("timestamp", event.timestamp)
+ record.put("host_total_count", event.totalHostCount)
+ record.put("host_available_count", event.availableHostCount)
+ record.put("vm_total_count", event.totalVmCount)
+ record.put("vm_active_count", event.activeVmCount)
+ record.put("vm_inactive_count", event.inactiveVmCount)
+ record.put("vm_waiting_count", event.waitingVmCount)
+ record.put("vm_failed_count", event.failedVmCount)
+ }
+
+ private val schema: Schema = SchemaBuilder
+ .record("provisioner_metrics")
+ .namespace("org.opendc.experiments.sc20")
+ .fields()
+ .name("timestamp").type().longType().noDefault()
+ .name("host_total_count").type().intType().noDefault()
+ .name("host_available_count").type().intType().noDefault()
+ .name("vm_total_count").type().intType().noDefault()
+ .name("vm_active_count").type().intType().noDefault()
+ .name("vm_inactive_count").type().intType().noDefault()
+ .name("vm_waiting_count").type().intType().noDefault()
+ .name("vm_failed_count").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt
new file mode 100644
index 00000000..946410eb
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.telemetry.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.opendc.experiments.capelin.telemetry.RunEvent
+import java.io.File
+
+/**
+ * A Parquet event writer for [RunEvent]s.
+ */
+public class ParquetRunEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "run-writer"
+
+ public companion object {
+ private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record ->
+ val portfolio = event.portfolio
+ record.put("portfolio_name", portfolio.name)
+ record.put("scenario_id", portfolio.id)
+ record.put("run_id", event.repeat)
+ record.put("topology", portfolio.topology.name)
+ record.put("workload_name", portfolio.workload.name)
+ record.put("workload_fraction", portfolio.workload.fraction)
+ record.put("workload_sampler", portfolio.workload.samplingStrategy)
+ record.put("allocation_policy", portfolio.allocationPolicy)
+ record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency)
+ record.put("interference", portfolio.operationalPhenomena.hasInterference)
+ record.put("seed", event.repeat)
+ }
+
+ private val schema: Schema = SchemaBuilder
+ .record("runs")
+ .namespace("org.opendc.experiments.sc20")
+ .fields()
+ .name("portfolio_name").type().stringType().noDefault()
+ .name("scenario_id").type().intType().noDefault()
+ .name("run_id").type().intType().noDefault()
+ .name("topology").type().stringType().noDefault()
+ .name("workload_name").type().stringType().noDefault()
+ .name("workload_fraction").type().doubleType().noDefault()
+ .name("workload_sampler").type().stringType().noDefault()
+ .name("allocation_policy").type().stringType().noDefault()
+ .name("failure_frequency").type().doubleType().noDefault()
+ .name("interference").type().booleanType().noDefault()
+ .name("seed").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
new file mode 100644
index 00000000..6cfdae40
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import org.opendc.compute.core.workload.VmWorkload
+import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.format.trace.TraceEntry
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import java.util.TreeSet
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param reader The internal trace reader to use.
+ * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ * @param run The run to which this reader belongs.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+public class Sc20ParquetTraceReader(
+ rawReaders: List<Sc20RawParquetTraceReader>,
+ performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
+ workload: Workload,
+ seed: Int
+) : TraceReader<VmWorkload> {
+ /**
+ * The iterator over the actual trace.
+ */
+ private val iterator: Iterator<TraceEntry<VmWorkload>> =
+ rawReaders
+ .map { it.read() }
+ .run {
+ if (workload is CompositeWorkload) {
+ this.zip(workload.workloads)
+ } else {
+ this.zip(listOf(workload))
+ }
+ }
+ .map { sampleWorkload(it.first, workload, it.second, seed) }
+ .flatten()
+ .run {
+ // Apply performance interference model
+ if (performanceInterferenceModel.isEmpty())
+ this
+ else {
+ map { entry ->
+ val image = entry.workload.image
+ val id = image.name
+ val relevantPerformanceInterferenceModelItems =
+ performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
+
+ val newImage =
+ SimWorkloadImage(
+ image.uid,
+ image.name,
+ image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
+ (image as SimWorkloadImage).workload
+ )
+ val newWorkload = entry.workload.copy(image = newImage)
+ Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload)
+ }
+ }
+ }
+ .iterator()
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<VmWorkload> = iterator.next()
+
+ override fun close() {}
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
new file mode 100644
index 00000000..d2560d62
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
@@ -0,0 +1,170 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import mu.KotlinLogging
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetReader
+import org.opendc.compute.core.workload.VmWorkload
+import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.core.User
+import org.opendc.format.trace.TraceEntry
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import java.io.File
+import java.util.UUID
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param path The directory of the traces.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+public class Sc20RawParquetTraceReader(private val path: File) {
+ /**
+ * Read the fragments into memory.
+ */
+ private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> {
+ val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet"))
+ .disableCompatibility()
+ .build()
+
+ val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
+
+ return try {
+ while (true) {
+ val record = reader.read() ?: break
+
+ val id = record["id"].toString()
+ val tick = record["time"] as Long
+ val duration = record["duration"] as Long
+ val cores = record["cores"] as Int
+ val cpuUsage = record["cpuUsage"] as Double
+ val flops = record["flops"] as Long
+
+ val fragment = SimTraceWorkload.Fragment(
+ duration,
+ cpuUsage,
+ cores
+ )
+
+ fragments.getOrPut(id) { mutableListOf() }.add(fragment)
+ }
+
+ fragments
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Read the metadata into a workload.
+ */
+ private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntryImpl> {
+ val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet"))
+ .disableCompatibility()
+ .build()
+
+ var counter = 0
+ val entries = mutableListOf<TraceEntryImpl>()
+
+ return try {
+ while (true) {
+ val record = metaReader.read() ?: break
+
+ val id = record["id"].toString()
+ if (!fragments.containsKey(id)) {
+ continue
+ }
+
+ val submissionTime = record["submissionTime"] as Long
+ val endTime = record["endTime"] as Long
+ val maxCores = record["maxCores"] as Int
+ val requiredMemory = record["requiredMemory"] as Long
+ val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
+
+ val vmFragments = fragments.getValue(id).asSequence()
+ val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
+ val vmWorkload = VmWorkload(
+ uid,
+ id,
+ UnnamedUser,
+ SimWorkloadImage(
+ uid,
+ id,
+ mapOf(
+ "submit-time" to submissionTime,
+ "end-time" to endTime,
+ "total-load" to totalLoad,
+ "cores" to maxCores,
+ "required-memory" to requiredMemory
+ ),
+ SimTraceWorkload(vmFragments)
+ )
+ )
+ entries.add(TraceEntryImpl(submissionTime, vmWorkload))
+ }
+
+ entries
+ } catch (e: Exception) {
+ e.printStackTrace()
+ throw e
+ } finally {
+ metaReader.close()
+ }
+ }
+
+ /**
+ * The entries in the trace.
+ */
+ private val entries: List<TraceEntryImpl>
+
+ init {
+ val fragments = parseFragments(path)
+ entries = parseMeta(path, fragments)
+ }
+
+ /**
+ * Read the entries in the trace.
+ */
+ public fun read(): List<TraceEntry<VmWorkload>> = entries
+
+ /**
+ * An unnamed user.
+ */
+ private object UnnamedUser : User {
+ override val name: String = "<unnamed>"
+ override val uid: UUID = UUID.randomUUID()
+ }
+
+ /**
+ * An entry in the trace.
+ */
+ internal data class TraceEntryImpl(
+ override var submissionTime: Long,
+ override val workload: VmWorkload
+ ) : TraceEntry<VmWorkload>
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
new file mode 100644
index 00000000..12705c80
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
@@ -0,0 +1,305 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import mu.KotlinLogging
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetReader
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.filter2.predicate.Statistics
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate
+import org.apache.parquet.io.api.Binary
+import org.opendc.compute.core.workload.VmWorkload
+import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.core.User
+import org.opendc.format.trace.TraceEntry
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import java.io.File
+import java.io.Serializable
+import java.util.SortedSet
+import java.util.TreeSet
+import java.util.UUID
+import java.util.concurrent.ArrayBlockingQueue
+import kotlin.concurrent.thread
+import kotlin.random.Random
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
+ *
+ * @param traceFile The directory of the traces.
+ * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+public class Sc20StreamingParquetTraceReader(
+ traceFile: File,
+ performanceInterferenceModel: PerformanceInterferenceModel,
+ selectedVms: List<String>,
+ random: Random
+) : TraceReader<VmWorkload> {
+ /**
+ * The internal iterator to use for this reader.
+ */
+ private val iterator: Iterator<TraceEntry<VmWorkload>>
+
+ /**
+ * The intermediate buffer to store the read records in.
+ */
+ private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024)
+
+ /**
+ * An optional filter for filtering the selected VMs
+ */
+ private val filter =
+ if (selectedVms.isEmpty())
+ null
+ else
+ FilterCompat.get(
+ FilterApi.userDefined(
+ FilterApi.binaryColumn("id"),
+ SelectedVmFilter(
+ TreeSet(selectedVms)
+ )
+ )
+ )
+
+ /**
+ * A poisonous fragment.
+ */
+ private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0))
+
+ /**
+ * The thread to read the records in.
+ */
+ private val readerThread = thread(start = true, name = "sc20-reader") {
+ val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
+ .disableCompatibility()
+ .run { if (filter != null) withFilter(filter) else this }
+ .build()
+
+ try {
+ while (true) {
+ val record = reader.read()
+
+ if (record == null) {
+ queue.put(poison)
+ break
+ }
+
+ val id = record["id"].toString()
+ val tick = record["time"] as Long
+ val duration = record["duration"] as Long
+ val cores = record["cores"] as Int
+ val cpuUsage = record["cpuUsage"] as Double
+ val flops = record["flops"] as Long
+
+ val fragment = SimTraceWorkload.Fragment(
+ duration,
+ cpuUsage,
+ cores
+ )
+
+ queue.put(id to fragment)
+ }
+ } catch (e: InterruptedException) {
+ // Do not rethrow this
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Fill the buffers with the VMs
+ */
+ private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) {
+ if (!hasNext) {
+ return
+ }
+
+ val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>()
+ queue.drainTo(fragments)
+
+ for ((id, fragment) in fragments) {
+ if (id == poison.first) {
+ hasNext = false
+ return
+ }
+ buffers[id]?.forEach { it.add(fragment) }
+ }
+ }
+
+ /**
+ * A flag to indicate whether the reader has more entries.
+ */
+ private var hasNext: Boolean = true
+
+ /**
+ * Initialize the reader.
+ */
+ init {
+ val takenIds = mutableSetOf<UUID>()
+ val entries = mutableMapOf<String, GenericData.Record>()
+ val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
+
+ val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
+ .disableCompatibility()
+ .run { if (filter != null) withFilter(filter) else this }
+ .build()
+
+ while (true) {
+ val record = metaReader.read() ?: break
+ val id = record["id"].toString()
+ entries[id] = record
+ }
+
+ metaReader.close()
+
+ val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms
+
+ // Create the entry iterator
+ iterator = selection.asSequence()
+ .mapNotNull { entries[it] }
+ .mapIndexed { index, record ->
+ val id = record["id"].toString()
+ val submissionTime = record["submissionTime"] as Long
+ val endTime = record["endTime"] as Long
+ val maxCores = record["maxCores"] as Int
+ val requiredMemory = record["requiredMemory"] as Long
+ val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray())
+
+ assert(uid !in takenIds)
+ takenIds += uid
+
+ logger.info("Processing VM $id")
+
+ val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
+ val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
+ buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
+ val fragments = sequence {
+ var time = submissionTime
+ repeat@ while (true) {
+ if (externalBuffer.isEmpty()) {
+ if (hasNext) {
+ pull(buffers)
+ continue
+ } else {
+ break
+ }
+ }
+
+ internalBuffer.addAll(externalBuffer)
+ externalBuffer.clear()
+
+ for (fragment in internalBuffer) {
+ yield(fragment)
+
+ time += fragment.duration
+ if (time >= endTime) {
+ break@repeat
+ }
+ }
+
+ internalBuffer.clear()
+ }
+
+ buffers.remove(id)
+ }
+ val relevantPerformanceInterferenceModelItems =
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(),
+ Random(random.nextInt())
+ )
+ val vmWorkload = VmWorkload(
+ uid,
+ "VM Workload $id",
+ UnnamedUser,
+ SimWorkloadImage(
+ uid,
+ id,
+ mapOf(
+ IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
+ "cores" to maxCores,
+ "required-memory" to requiredMemory
+ ),
+ SimTraceWorkload(fragments),
+ )
+ )
+
+ TraceEntryImpl(
+ submissionTime,
+ vmWorkload
+ )
+ }
+ .sortedBy { it.submissionTime }
+ .toList()
+ .iterator()
+ }
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<VmWorkload> = iterator.next()
+
+ override fun close() {
+ readerThread.interrupt()
+ }
+
+ private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
+ override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
+
+ override fun canDrop(statistics: Statistics<Binary>): Boolean {
+ val min = statistics.min
+ val max = statistics.max
+
+ return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty()
+ }
+
+ override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean {
+ val min = statistics.min
+ val max = statistics.max
+
+ return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
+ }
+ }
+
+ /**
+ * An unnamed user.
+ */
+ private object UnnamedUser : User {
+ override val name: String = "<unnamed>"
+ override val uid: UUID = UUID.randomUUID()
+ }
+
+ /**
+ * An entry in the trace.
+ */
+ private data class TraceEntryImpl(
+ override var submissionTime: Long,
+ override val workload: VmWorkload
+ ) : TraceEntry<VmWorkload>
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
new file mode 100644
index 00000000..7713c06f
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
@@ -0,0 +1,621 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.arguments.argument
+import com.github.ajalt.clikt.parameters.groups.OptionGroup
+import com.github.ajalt.clikt.parameters.groups.groupChoice
+import com.github.ajalt.clikt.parameters.options.convert
+import com.github.ajalt.clikt.parameters.options.default
+import com.github.ajalt.clikt.parameters.options.defaultLazy
+import com.github.ajalt.clikt.parameters.options.option
+import com.github.ajalt.clikt.parameters.options.required
+import com.github.ajalt.clikt.parameters.options.split
+import com.github.ajalt.clikt.parameters.types.file
+import com.github.ajalt.clikt.parameters.types.long
+import me.tongfei.progressbar.ProgressBar
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.format.trace.sc20.Sc20VmPlacementReader
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import java.util.Random
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * Represents the command for converting traces
+ */
+public class TraceConverterCli : CliktCommand(name = "trace-converter") {
+ /**
+ * The directory where the trace should be stored.
+ */
+ private val outputPath by option("-O", "--output", help = "path to store the trace")
+ .file(canBeFile = false, mustExist = false)
+ .defaultLazy { File("output") }
+
+ /**
+ * The directory where the input trace is located.
+ */
+ private val inputPath by argument("input", help = "path to the input trace")
+ .file(canBeFile = false)
+
+ /**
+ * The input type of the trace.
+ */
+ private val type by option("-t", "--type", help = "input type of trace").groupChoice(
+ "solvinity" to SolvinityConversion(),
+ "bitbrains" to BitbrainsConversion(),
+ "azure" to AzureConversion()
+ )
+
+ override fun run() {
+ val metaSchema = SchemaBuilder
+ .record("meta")
+ .namespace("org.opendc.format.sc20")
+ .fields()
+ .name("id").type().stringType().noDefault()
+ .name("submissionTime").type().longType().noDefault()
+ .name("endTime").type().longType().noDefault()
+ .name("maxCores").type().intType().noDefault()
+ .name("requiredMemory").type().longType().noDefault()
+ .endRecord()
+ val schema = SchemaBuilder
+ .record("trace")
+ .namespace("org.opendc.format.sc20")
+ .fields()
+ .name("id").type().stringType().noDefault()
+ .name("time").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("cores").type().intType().noDefault()
+ .name("cpuUsage").type().doubleType().noDefault()
+ .name("flops").type().longType().noDefault()
+ .endRecord()
+
+ val metaParquet = File(outputPath, "meta.parquet")
+ val traceParquet = File(outputPath, "trace.parquet")
+
+ if (metaParquet.exists()) {
+ metaParquet.delete()
+ }
+ if (traceParquet.exists()) {
+ traceParquet.delete()
+ }
+
+ val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(metaParquet.toURI()))
+ .withSchema(metaSchema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ val writer = AvroParquetWriter.builder<GenericData.Record>(Path(traceParquet.toURI()))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ try {
+ val type = type ?: throw IllegalArgumentException("Invalid trace conversion")
+ val allFragments = type.read(inputPath, metaSchema, metaWriter)
+ allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
+
+ for (fragment in allFragments) {
+ val record = GenericData.Record(schema)
+ record.put("id", fragment.id)
+ record.put("time", fragment.tick)
+ record.put("duration", fragment.duration)
+ record.put("cores", fragment.cores)
+ record.put("cpuUsage", fragment.usage)
+ record.put("flops", fragment.flops)
+
+ writer.write(record)
+ }
+ } finally {
+ writer.close()
+ metaWriter.close()
+ }
+ }
+}
+
+/**
+ * The supported trace conversions.
+ */
+public sealed class TraceConversion(name: String) : OptionGroup(name) {
+ /**
+ * Read the fragments of the trace.
+ */
+ public abstract fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment>
+}
+
+public class SolvinityConversion : TraceConversion("Solvinity") {
+ private val clusters by option()
+ .split(",")
+
+ private val vmPlacements by option("--vm-placements", help = "file containing the VM placements")
+ .file(canBeDir = false)
+ .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } }
+ .required()
+
+ override fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment> {
+ val clusters = clusters?.toSet() ?: emptySet()
+ val timestampCol = 0
+ val cpuUsageCol = 1
+ val coreCol = 12
+ val provisionedMemoryCol = 20
+ val traceInterval = 5 * 60 * 1000L
+
+ // Identify start time of the entire trace
+ var minTimestamp = Long.MAX_VALUE
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ .forEach file@{ vmFile ->
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val vmId = vmFile.name
+
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null || !clusters.contains(clusterName)) {
+ continue
+ }
+
+ val values = line.split("\t")
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+
+ if (timestamp < minTimestamp) {
+ minTimestamp = timestamp
+ }
+ return@file
+ }
+ }
+ }
+ }
+
+ println("Start of trace at $minTimestamp")
+
+ val allFragments = mutableListOf<Fragment>()
+
+ val begin = 15 * 24 * 60 * 60 * 1000L
+ val end = 45 * 24 * 60 * 60 * 1000L
+
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ .forEach { vmFile ->
+ println(vmFile)
+
+ var vmId = ""
+ var maxCores = -1
+ var requiredMemory = -1L
+ var cores: Int
+ var minTime = Long.MAX_VALUE
+
+ val flopsFragments = sequence {
+ var last: Fragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split("\t")
+
+ vmId = vmFile.name
+
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null || !clusters.contains(clusterName)) {
+ continue
+ }
+
+ val timestamp =
+ (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp
+ if (begin > timestamp || timestamp > end) {
+ continue
+ }
+
+ cores = values[coreCol].trim().toInt()
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
+ Fragment(
+ vmId,
+ oldFragment.tick,
+ oldFragment.flops + flops,
+ oldFragment.duration + traceInterval,
+ cpuUsage,
+ cores
+ )
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ cores
+ )
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
+ }
+ }
+ }
+ }
+
+ if (last != null) {
+ yield(last!!)
+ }
+ }
+
+ var maxTime = Long.MIN_VALUE
+ flopsFragments.filter { it.tick in begin until end }.forEach { fragment ->
+ allFragments.add(fragment)
+ maxTime = max(maxTime, fragment.tick)
+ }
+
+ if (minTime in begin until end) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", vmId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+ }
+
+ return allFragments
+ }
+}
+
+/**
+ * Conversion of the Bitbrains public trace.
+ */
+public class BitbrainsConversion : TraceConversion("Bitbrains") {
+ override fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment> {
+ val timestampCol = 0
+ val cpuUsageCol = 3
+ val coreCol = 1
+ val provisionedMemoryCol = 5
+ val traceInterval = 5 * 60 * 1000L
+
+ val allFragments = mutableListOf<Fragment>()
+
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ .forEach { vmFile ->
+ println(vmFile)
+
+ var vmId = ""
+ var maxCores = -1
+ var requiredMemory = -1L
+ var cores: Int
+ var minTime = Long.MAX_VALUE
+
+ val flopsFragments = sequence {
+ var last: Fragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .drop(1)
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(";\t")
+
+ vmId = vmFile.name
+
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+
+ cores = values[coreCol].trim().toInt()
+ val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB
+ requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong())
+ maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
+ Fragment(
+ vmId,
+ oldFragment.tick,
+ oldFragment.flops + flops,
+ oldFragment.duration + traceInterval,
+ cpuUsage,
+ cores
+ )
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ cores
+ )
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
+ }
+ }
+ }
+ }
+
+ if (last != null) {
+ yield(last!!)
+ }
+ }
+
+ var maxTime = Long.MIN_VALUE
+ flopsFragments.forEach { fragment ->
+ allFragments.add(fragment)
+ maxTime = max(maxTime, fragment.tick)
+ }
+
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", vmId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+
+ return allFragments
+ }
+}
+
+/**
+ * Conversion of the Azure public VM trace.
+ */
+public class AzureConversion : TraceConversion("Azure") {
+ private val seed by option(help = "seed for trace sampling")
+ .long()
+ .default(0)
+
+ override fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment> {
+ val random = Random(seed)
+ val fraction = 0.01
+
+ // Read VM table
+ val vmIdTableCol = 0
+ val coreTableCol = 9
+ val provisionedMemoryTableCol = 10
+
+ var vmId: String
+ var cores: Int
+ var requiredMemory: Long
+
+ val vmIds = mutableSetOf<String>()
+ val vmIdToMetadata = mutableMapOf<String, VmInfo>()
+
+ BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader ->
+ reader.lineSequence()
+ .chunked(1024)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+ // Sample only a fraction of the VMs
+ if (random.nextDouble() > fraction) {
+ continue
+ }
+
+ val values = line.split(",")
+
+ // Exclude VMs with a large number of cores (not specified exactly)
+ if (values[coreTableCol].contains(">")) {
+ continue
+ }
+
+ vmId = values[vmIdTableCol].trim()
+ cores = values[coreTableCol].trim().toInt()
+ requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB
+
+ vmIds.add(vmId)
+ vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L)
+ }
+ }
+ }
+
+ // Read VM metric reading files
+ val timestampCol = 0
+ val vmIdCol = 1
+ val cpuUsageCol = 4
+ val traceInterval = 5 * 60 * 1000L
+
+ val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>()
+ val vmIdToLastFragment = mutableMapOf<String, Fragment?>()
+ val allFragments = mutableListOf<Fragment>()
+
+ for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) {
+ val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv")
+ var timestamp: Long
+ var cpuUsage: Double
+
+ BufferedReader(FileReader(readingsFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(",")
+ vmId = values[vmIdCol].trim()
+
+ // Ignore readings for VMs not in the sample
+ if (!vmIds.contains(vmId)) {
+ continue
+ }
+
+ timestamp = values[timestampCol].trim().toLong() * 1000L
+ vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp)
+ cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz
+ vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+ val lastFragment = vmIdToLastFragment[vmId]
+
+ vmIdToLastFragment[vmId] =
+ if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) {
+ Fragment(
+ vmId,
+ lastFragment.tick,
+ lastFragment.flops + flops,
+ lastFragment.duration + traceInterval,
+ cpuUsage,
+ vmIdToMetadata[vmId]!!.cores
+ )
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ vmIdToMetadata[vmId]!!.cores
+ )
+ if (lastFragment != null) {
+ if (vmIdToFragments[vmId] == null) {
+ vmIdToFragments[vmId] = mutableListOf()
+ }
+ vmIdToFragments[vmId]!!.add(lastFragment)
+ allFragments.add(lastFragment)
+ }
+ fragment
+ }
+ }
+ }
+ }
+ }
+
+ for (entry in vmIdToLastFragment) {
+ if (entry.value != null) {
+ if (vmIdToFragments[entry.key] == null) {
+ vmIdToFragments[entry.key] = mutableListOf()
+ }
+ vmIdToFragments[entry.key]!!.add(entry.value!!)
+ }
+ }
+
+ println("Read ${vmIdToLastFragment.size} VMs")
+
+ for (entry in vmIdToMetadata) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", entry.key)
+ metaRecord.put("submissionTime", entry.value.minTime)
+ metaRecord.put("endTime", entry.value.maxTime)
+ println("${entry.value.minTime} - ${entry.value.maxTime}")
+ metaRecord.put("maxCores", entry.value.cores)
+ metaRecord.put("requiredMemory", entry.value.requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+
+ return allFragments
+ }
+}
+
+public data class Fragment(
+ public val id: String,
+ public val tick: Long,
+ public val flops: Long,
+ public val duration: Long,
+ public val usage: Double,
+ public val cores: Int
+)
+
+public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long)
+
+/**
+ * A script to convert a trace in text format into a Parquet trace.
+ */
+public fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
new file mode 100644
index 00000000..4d9b9df1
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
@@ -0,0 +1,210 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import mu.KotlinLogging
+import org.opendc.compute.core.workload.VmWorkload
+import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.experiments.capelin.model.SamplingStrategy
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.format.trace.TraceEntry
+import java.util.*
+import kotlin.random.Random
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * Sample the workload for the specified [run].
+ */
+public fun sampleWorkload(
+ trace: List<TraceEntry<VmWorkload>>,
+ workload: Workload,
+ subWorkload: Workload,
+ seed: Int
+): List<TraceEntry<VmWorkload>> {
+ return when {
+ workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed)
+ workload.samplingStrategy == SamplingStrategy.HPC ->
+ sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false)
+ workload.samplingStrategy == SamplingStrategy.HPC_LOAD ->
+ sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true)
+ else ->
+ sampleRegularWorkload(trace, workload, workload, seed)
+ }
+}
+
+/**
+ * Sample a regular (non-HPC) workload.
+ */
+public fun sampleRegularWorkload(
+ trace: List<TraceEntry<VmWorkload>>,
+ workload: Workload,
+ subWorkload: Workload,
+ seed: Int
+): List<TraceEntry<VmWorkload>> {
+ val fraction = subWorkload.fraction
+
+ val shuffled = trace.shuffled(Random(seed))
+ val res = mutableListOf<TraceEntry<VmWorkload>>()
+ val totalLoad = if (workload is CompositeWorkload) {
+ workload.totalLoad
+ } else {
+ shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double }
+ }
+ var currentLoad = 0.0
+
+ for (entry in shuffled) {
+ val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+}
+
+/**
+ * Sample a HPC workload.
+ */
+public fun sampleHpcWorkload(
+ trace: List<TraceEntry<VmWorkload>>,
+ workload: Workload,
+ seed: Int,
+ sampleOnLoad: Boolean
+): List<TraceEntry<VmWorkload>> {
+ val pattern = Regex("^vm__workload__(ComputeNode|cn).*")
+ val random = Random(seed)
+
+ val fraction = workload.fraction
+ val (hpc, nonHpc) = trace.partition { entry ->
+ val name = entry.workload.image.name
+ name.matches(pattern)
+ }
+
+ val hpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<TraceEntry<VmWorkload>>()
+ hpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ val nonHpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<TraceEntry<VmWorkload>>()
+ nonHpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
+
+ val totalLoad = if (workload is CompositeWorkload) {
+ workload.totalLoad
+ } else {
+ trace.sumByDouble { it.workload.image.tags.getValue("total-load") as Double }
+ }
+
+ logger.debug { "Total trace load: $totalLoad" }
+ var hpcCount = 0
+ var hpcLoad = 0.0
+ var nonHpcCount = 0
+ var nonHpcLoad = 0.0
+
+ val res = mutableListOf<TraceEntry<VmWorkload>>()
+
+ if (sampleOnLoad) {
+ var currentLoad = 0.0
+ for (entry in hpcSequence) {
+ val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ hpcLoad += entryLoad
+ hpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ for (entry in nonHpcSequence) {
+ val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
+ if ((currentLoad + entryLoad) / totalLoad > 1) {
+ break
+ }
+
+ nonHpcLoad += entryLoad
+ nonHpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+ } else {
+ hpcSequence
+ .take((fraction * trace.size).toInt())
+ .forEach { entry ->
+ hpcLoad += entry.workload.image.tags.getValue("total-load") as Double
+ hpcCount += 1
+ res.add(entry)
+ }
+
+ nonHpcSequence
+ .take(((1 - fraction) * trace.size).toInt())
+ .forEach { entry ->
+ nonHpcLoad += entry.workload.image.tags.getValue("total-load") as Double
+ nonHpcCount += 1
+ res.add(entry)
+ }
+ }
+
+ logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
+ logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
+ logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+}
+
+/**
+ * Sample a random trace entry.
+ */
+private fun sample(entry: TraceEntry<VmWorkload>, i: Int): TraceEntry<VmWorkload> {
+ val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray())
+ val image = SimWorkloadImage(
+ id,
+ entry.workload.image.name,
+ entry.workload.image.tags,
+ (entry.workload.image as SimWorkloadImage).workload
+ )
+ val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name)
+ return VmTraceEntry(vmWorkload, entry.submissionTime)
+}
+
+private class VmTraceEntry(override val workload: VmWorkload, override val submissionTime: Long) :
+ TraceEntry<VmWorkload>
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
new file mode 100644
index 00000000..d1c01b8e
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ MIT License
+ ~
+ ~ Copyright (c) 2020 atlarge-research
+ ~
+ ~ Permission is hereby granted, free of charge, to any person obtaining a copy
+ ~ of this software and associated documentation files (the "Software"), to deal
+ ~ in the Software without restriction, including without limitation the rights
+ ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ ~ copies of the Software, and to permit persons to whom the Software is
+ ~ furnished to do so, subject to the following conditions:
+ ~
+ ~ The above copyright notice and this permission notice shall be included in all
+ ~ copies or substantial portions of the Software.
+ ~
+ ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ ~ SOFTWARE.
+ -->
+
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="org.opendc" level="debug" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.opendc.experiments.capelin" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.opendc.experiments.capelin.trace" level="debug" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.apache.hadoop" level="warn" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Root level="error">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
new file mode 100644
index 00000000..6a0796f6
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -0,0 +1,256 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.TestCoroutineScope
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.opendc.compute.core.Server
+import org.opendc.compute.core.workload.VmWorkload
+import org.opendc.compute.simulator.SimVirtProvisioningService
+import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy
+import org.opendc.experiments.capelin.experiment.attachMonitor
+import org.opendc.experiments.capelin.experiment.createFailureDomain
+import org.opendc.experiments.capelin.experiment.createProvisioner
+import org.opendc.experiments.capelin.experiment.processTrace
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.capelin.monitor.ExperimentMonitor
+import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
+import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
+import org.opendc.format.environment.EnvironmentReader
+import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.trace.core.EventTracer
+import java.io.File
+import java.time.Clock
+
+/**
+ * An integration test suite for the SC20 experiments.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+class CapelinIntegrationTest {
+ /**
+ * The [TestCoroutineScope] to use.
+ */
+ private lateinit var testScope: TestCoroutineScope
+
+ /**
+ * The simulation clock to use.
+ */
+ private lateinit var clock: Clock
+
+ /**
+ * The monitor used to keep track of the metrics.
+ */
+ private lateinit var monitor: TestExperimentReporter
+
+ /**
+ * Setup the experimental environment.
+ */
+ @BeforeEach
+ fun setUp() {
+ testScope = TestCoroutineScope()
+ clock = DelayControllerClockAdapter(testScope)
+
+ monitor = TestExperimentReporter()
+ }
+
+ /**
+ * Tear down the experimental environment.
+ */
+ @AfterEach
+ fun tearDown() = testScope.cleanupTestCoroutines()
+
+ @Test
+ fun testLarge() {
+ val failures = false
+ val seed = 0
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
+ val traceReader = createTestTraceReader()
+ val environmentReader = createTestEnvironmentReader()
+ lateinit var scheduler: SimVirtProvisioningService
+ val tracer = EventTracer(clock)
+
+ testScope.launch {
+ val res = createProvisioner(
+ this,
+ clock,
+ environmentReader,
+ allocationPolicy,
+ tracer
+ )
+ val bareMetalProvisioner = res.first
+ scheduler = res.second
+
+ val failureDomain = if (failures) {
+ println("ENABLING failures")
+ createFailureDomain(
+ this,
+ clock,
+ seed,
+ 24.0 * 7,
+ bareMetalProvisioner,
+ chan
+ )
+ } else {
+ null
+ }
+
+ attachMonitor(this, clock, scheduler, monitor)
+ processTrace(
+ this,
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+
+ println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+
+ failureDomain?.cancel()
+ scheduler.terminate()
+ monitor.close()
+ }
+
+ runSimulation()
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
+ { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
+ { assertEquals(1684849230562, monitor.totalRequestedBurst) },
+ { assertEquals(447612683996, monitor.totalGrantedBurst) },
+ { assertEquals(1219535757406, monitor.totalOvercommissionedBurst) },
+ { assertEquals(0, monitor.totalInterferedBurst) }
+ )
+ }
+
+ @Test
+ fun testSmall() {
+ val seed = 1
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
+ val traceReader = createTestTraceReader(0.5, seed)
+ val environmentReader = createTestEnvironmentReader("single")
+ lateinit var scheduler: SimVirtProvisioningService
+ val tracer = EventTracer(clock)
+
+ testScope.launch {
+ val res = createProvisioner(
+ this,
+ clock,
+ environmentReader,
+ allocationPolicy,
+ tracer
+ )
+ scheduler = res.second
+
+ attachMonitor(this, clock, scheduler, monitor)
+ processTrace(
+ this,
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+
+ println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+
+ scheduler.terminate()
+ monitor.close()
+ }
+
+ runSimulation()
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(705128393966, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
+ )
+ }
+
+ /**
+ * Run the simulation.
+ */
+ private fun runSimulation() = testScope.advanceUntilIdle()
+
+ /**
+ * Obtain the trace reader for the test.
+ */
+ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> {
+ return Sc20ParquetTraceReader(
+ listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))),
+ emptyMap(),
+ Workload("test", fraction),
+ seed
+ )
+ }
+
+ /**
+ * Obtain the environment reader for the test.
+ */
+ private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader {
+ val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt")
+ return Sc20ClusterEnvironmentReader(stream)
+ }
+
+ class TestExperimentReporter : ExperimentMonitor {
+ var totalRequestedBurst = 0L
+ var totalGrantedBurst = 0L
+ var totalOvercommissionedBurst = 0L
+ var totalInterferedBurst = 0L
+
+ override fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ numberOfDeployedImages: Int,
+ hostServer: Server,
+ duration: Long
+ ) {
+ totalRequestedBurst += requestedBurst
+ totalGrantedBurst += grantedBurst
+ totalOvercommissionedBurst += overcommissionedBurst
+ totalInterferedBurst += interferedBurst
+ }
+
+ override fun close() {}
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
new file mode 100644
index 00000000..53b3c2d7
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
@@ -0,0 +1,3 @@
+ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
+A01;A01;8;3.2;64;1;64;8
+
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt
new file mode 100644
index 00000000..6b347bff
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt
@@ -0,0 +1,5 @@
+ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
+A01;A01;32;3.2;2048;1;256;32
+B01;B01;48;2.93;1256;6;64;8
+C01;C01;32;3.2;2048;2;128;16
+
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
new file mode 100644
index 00000000..ce7a812c
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
Binary files differ
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
new file mode 100644
index 00000000..1d7ce882
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
Binary files differ