summaryrefslogtreecommitdiff
path: root/simulator/opendc-experiments/opendc-experiments-capelin
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/.gitignore2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts52
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt79
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt297
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt60
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt59
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt56
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt61
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt233
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt50
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt47
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt31
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt28
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt44
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt160
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt74
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt118
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt35
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt43
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt39
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt34
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt41
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt126
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt81
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt65
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt72
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt84
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt157
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt284
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt621
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt199
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml49
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt216
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt3
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt5
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquetbin2148 -> 0 bytes
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquetbin1672463 -> 0 bytes
37 files changed, 0 insertions, 3605 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/.gitignore b/simulator/opendc-experiments/opendc-experiments-capelin/.gitignore
deleted file mode 100644
index ba64707c..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-input/
-output/
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
deleted file mode 100644
index b2d7cc30..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (c) 2019 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-description = "Experiments for the Capelin work"
-
-/* Build configuration */
-plugins {
- `kotlin-library-conventions`
- `experiment-conventions`
- `testing-conventions`
-}
-
-dependencies {
- api(platform(project(":opendc-platform")))
- api(project(":opendc-harness"))
- implementation(project(":opendc-format"))
- implementation(project(":opendc-simulator:opendc-simulator-core"))
- implementation(project(":opendc-simulator:opendc-simulator-compute"))
- implementation(project(":opendc-simulator:opendc-simulator-failures"))
- implementation(project(":opendc-compute:opendc-compute-simulator"))
-
- implementation("io.github.microutils:kotlin-logging")
- implementation("me.tongfei:progressbar:${versions["progressbar"]}")
- implementation("com.github.ajalt.clikt:clikt:${versions["clikt"]}")
-
- implementation("org.apache.parquet:parquet-avro:${versions["parquet-avro"]}")
- implementation("org.apache.hadoop:hadoop-client:${versions["hadoop-client"]}") {
- exclude(group = "org.slf4j", module = "slf4j-log4j12")
- exclude(group = "log4j")
- }
-
- implementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
deleted file mode 100644
index faabe5cb..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.OperationalPhenomena
-import org.opendc.experiments.capelin.model.Topology
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.harness.dsl.anyOf
-
-/**
- * A [Portfolio] that explores the effect of a composite workload.
- */
-public class CompositeWorkloadPortfolio : Portfolio("composite-workload") {
- private val totalSampleLoad = 1.3301733005049648E12
-
- override val topology: Topology by anyOf(
- Topology("base"),
- Topology("exp-vol-hor-hom"),
- Topology("exp-vol-ver-hom"),
- Topology("exp-vel-ver-hom")
- )
-
- override val workload: Workload by anyOf(
- CompositeWorkload(
- "all-azure",
- listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)),
- totalSampleLoad
- ),
- CompositeWorkload(
- "solvinity-25-azure-75",
- listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)),
- totalSampleLoad
- ),
- CompositeWorkload(
- "solvinity-50-azure-50",
- listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)),
- totalSampleLoad
- ),
- CompositeWorkload(
- "solvinity-75-azure-25",
- listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)),
- totalSampleLoad
- ),
- CompositeWorkload(
- "all-solvinity",
- listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)),
- totalSampleLoad
- )
- )
-
- override val operationalPhenomena: OperationalPhenomena by anyOf(
- OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false)
- )
-
- override val allocationPolicy: String by anyOf(
- "active-servers"
- )
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
deleted file mode 100644
index 763234f8..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
-import mu.KotlinLogging
-import org.opendc.compute.api.*
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostListener
-import org.opendc.compute.service.driver.HostState
-import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.compute.simulator.SimHost
-import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.failures.CorrelatedFaultInjector
-import org.opendc.simulator.failures.FaultInjector
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
-import java.io.File
-import java.time.Clock
-import kotlin.coroutines.coroutineContext
-import kotlin.coroutines.resume
-import kotlin.math.ln
-import kotlin.math.max
-import kotlin.random.Random
-
-/**
- * The logger for this experiment.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * Construct the failure domain for the experiments.
- */
-public fun createFailureDomain(
- coroutineScope: CoroutineScope,
- clock: Clock,
- seed: Int,
- failureInterval: Double,
- service: ComputeService,
- chan: Channel<Unit>
-): CoroutineScope {
- val job = coroutineScope.launch {
- chan.receive()
- val random = Random(seed)
- val injectors = mutableMapOf<String, FaultInjector>()
- for (host in service.hosts) {
- val cluster = host.meta["cluster"] as String
- val injector =
- injectors.getOrPut(cluster) {
- createFaultInjector(
- this,
- clock,
- random,
- failureInterval
- )
- }
- injector.enqueue(host as SimHost)
- }
- }
- return CoroutineScope(coroutineScope.coroutineContext + job)
-}
-
-/**
- * Obtain the [FaultInjector] to use for the experiments.
- */
-public fun createFaultInjector(
- coroutineScope: CoroutineScope,
- clock: Clock,
- random: Random,
- failureInterval: Double
-): FaultInjector {
- // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- // GRID'5000
- return CorrelatedFaultInjector(
- coroutineScope,
- clock,
- iatScale = ln(failureInterval), iatShape = 1.03, // Hours
- sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
- dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
- random = random
- )
-}
-
-/**
- * Create the trace reader from which the VM workloads are read.
- */
-public fun createTraceReader(
- path: File,
- performanceInterferenceModel: PerformanceInterferenceModel,
- vms: List<String>,
- seed: Int
-): Sc20StreamingParquetTraceReader {
- return Sc20StreamingParquetTraceReader(
- path,
- performanceInterferenceModel,
- vms,
- Random(seed)
- )
-}
-
-/**
- * Construct the environment for a simulated compute service..
- */
-public suspend fun withComputeService(
- clock: Clock,
- meterProvider: MeterProvider,
- environmentReader: EnvironmentReader,
- scheduler: ComputeScheduler,
- block: suspend CoroutineScope.(ComputeService) -> Unit
-): Unit = coroutineScope {
- val hosts = environmentReader
- .use { it.read() }
- .map { def ->
- SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- coroutineContext,
- clock,
- meterProvider.get("opendc-compute-simulator"),
- SimFairShareHypervisorProvider(),
- def.powerModel
- )
- }
-
- val serviceMeter = meterProvider.get("opendc-compute")
- val service =
- ComputeService(coroutineContext, clock, serviceMeter, scheduler)
-
- for (host in hosts) {
- service.addHost(host)
- }
-
- try {
- block(this, service)
- } finally {
- service.close()
- hosts.forEach(SimHost::close)
- }
-}
-
-/**
- * Attach the specified monitor to the VM provisioner.
- */
-@OptIn(ExperimentalCoroutinesApi::class)
-public suspend fun withMonitor(
- monitor: ExperimentMonitor,
- clock: Clock,
- metricProducer: MetricProducer,
- scheduler: ComputeService,
- block: suspend CoroutineScope.() -> Unit
-): Unit = coroutineScope {
- val monitorJobs = mutableSetOf<Job>()
-
- // Monitor host events
- for (host in scheduler.hosts) {
- monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
- host.addListener(object : HostListener {
- override fun onStateChanged(host: Host, newState: HostState) {
- monitor.reportHostStateChange(clock.millis(), host, newState)
- }
- })
- }
-
- val reader = CoroutineMetricReader(
- this,
- listOf(metricProducer),
- ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }),
- exportInterval = 5 * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */
- )
-
- try {
- block(this)
- } finally {
- monitorJobs.forEach(Job::cancel)
- reader.close()
- monitor.close()
- }
-}
-
-public class ComputeMetrics {
- public var submittedVms: Int = 0
- public var queuedVms: Int = 0
- public var runningVms: Int = 0
- public var unscheduledVms: Int = 0
- public var finishedVms: Int = 0
-}
-
-/**
- * Collect the metrics of the compute service.
- */
-public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
- val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
- val res = ComputeMetrics()
- try {
- // Hack to extract metrics from OpenTelemetry SDK
- res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- } catch (cause: Throwable) {
- logger.warn(cause) { "Failed to collect metrics" }
- }
- return res
-}
-
-/**
- * Process the trace.
- */
-public suspend fun processTrace(
- clock: Clock,
- reader: TraceReader<SimWorkload>,
- scheduler: ComputeService,
- chan: Channel<Unit>,
- monitor: ExperimentMonitor
-) {
- val client = scheduler.newClient()
- val image = client.newImage("vm-image")
- var offset = Long.MIN_VALUE
- try {
- coroutineScope {
- while (reader.hasNext()) {
- val entry = reader.next()
-
- if (offset < 0) {
- offset = entry.start - clock.millis()
- }
-
- delay(max(0, (entry.start - offset) - clock.millis()))
- launch {
- chan.send(Unit)
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace)
- val server = client.newServer(
- entry.name,
- image,
- client.newFlavor(
- entry.name,
- entry.meta["cores"] as Int,
- entry.meta["required-memory"] as Long
- ),
- meta = entry.meta + mapOf("workload" to workload)
- )
-
- suspendCancellableCoroutine { cont ->
- server.watch(object : ServerWatcher {
- override fun onStateChanged(server: Server, newState: ServerState) {
- monitor.reportVmStateChange(clock.millis(), server, newState)
-
- if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) {
- cont.resume(Unit)
- }
- }
- })
- }
- }
- }
- }
-
- yield()
- } finally {
- reader.close()
- client.close()
- }
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
deleted file mode 100644
index e1cf8517..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import org.opendc.experiments.capelin.model.OperationalPhenomena
-import org.opendc.experiments.capelin.model.Topology
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.harness.dsl.anyOf
-
-/**
- * A [Portfolio] that explores the difference between horizontal and vertical scaling.
- */
-public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") {
- override val topology: Topology by anyOf(
- Topology("base"),
- Topology("rep-vol-hor-hom"),
- Topology("rep-vol-hor-het"),
- Topology("rep-vol-ver-hom"),
- Topology("rep-vol-ver-het"),
- Topology("exp-vol-hor-hom"),
- Topology("exp-vol-hor-het"),
- Topology("exp-vol-ver-hom"),
- Topology("exp-vol-ver-het")
- )
-
- override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
- )
-
- override val operationalPhenomena: OperationalPhenomena by anyOf(
- OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
- )
-
- override val allocationPolicy: String by anyOf(
- "active-servers"
- )
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
deleted file mode 100644
index a995e467..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import org.opendc.experiments.capelin.model.OperationalPhenomena
-import org.opendc.experiments.capelin.model.SamplingStrategy
-import org.opendc.experiments.capelin.model.Topology
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.harness.dsl.anyOf
-
-/**
- * A [Portfolio] to explore the effect of HPC workloads.
- */
-public class MoreHpcPortfolio : Portfolio("more_hpc") {
- override val topology: Topology by anyOf(
- Topology("base"),
- Topology("exp-vol-hor-hom"),
- Topology("exp-vol-ver-hom"),
- Topology("exp-vel-ver-hom")
- )
-
- override val workload: Workload by anyOf(
- Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD),
- Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD),
- Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD)
- )
-
- override val operationalPhenomena: OperationalPhenomena by anyOf(
- OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
- )
-
- override val allocationPolicy: String by anyOf(
- "active-servers"
- )
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
deleted file mode 100644
index 49559e0e..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import org.opendc.experiments.capelin.model.OperationalPhenomena
-import org.opendc.experiments.capelin.model.Topology
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.harness.dsl.anyOf
-
-/**
- * A [Portfolio] that explores the effect of adding more velocity to a cluster (e.g., faster machines).
- */
-public class MoreVelocityPortfolio : Portfolio("more_velocity") {
- override val topology: Topology by anyOf(
- Topology("base"),
- Topology("rep-vel-ver-hom"),
- Topology("rep-vel-ver-het"),
- Topology("exp-vel-ver-hom"),
- Topology("exp-vel-ver-het")
- )
-
- override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
- )
-
- override val operationalPhenomena: OperationalPhenomena by anyOf(
- OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
- )
-
- override val allocationPolicy: String by anyOf(
- "active-servers"
- )
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
deleted file mode 100644
index 1aac4f9e..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import org.opendc.experiments.capelin.model.OperationalPhenomena
-import org.opendc.experiments.capelin.model.Topology
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.harness.dsl.anyOf
-
-/**
- * A [Portfolio] that explores the effect of operational phenomena on metrics.
- */
-public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena") {
- override val topology: Topology by anyOf(
- Topology("base")
- )
-
- override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
- )
-
- override val operationalPhenomena: OperationalPhenomena by anyOf(
- OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true),
- OperationalPhenomena(failureFrequency = 0.0, hasInterference = true),
- OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false),
- OperationalPhenomena(failureFrequency = 0.0, hasInterference = false)
- )
-
- override val allocationPolicy: String by anyOf(
- "mem",
- "mem-inv",
- "core-mem",
- "core-mem-inv",
- "active-servers",
- "active-servers-inv",
- "random"
- )
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
deleted file mode 100644
index b969366c..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.Channel
-import mu.KotlinLogging
-import org.opendc.compute.service.scheduler.*
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.*
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.OperationalPhenomena
-import org.opendc.experiments.capelin.model.Topology
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
-import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
-import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import org.opendc.format.trace.PerformanceInterferenceModelReader
-import org.opendc.harness.dsl.Experiment
-import org.opendc.harness.dsl.anyOf
-import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.sdk.toOtelClock
-import java.io.File
-import java.util.*
-import java.util.concurrent.ConcurrentHashMap
-import kotlin.random.asKotlinRandom
-
-/**
- * A portfolio represents a collection of scenarios are tested for the work.
- *
- * @param name The name of the portfolio.
- */
-public abstract class Portfolio(name: String) : Experiment(name) {
- /**
- * The logger for this portfolio instance.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * The path to where the environments are located.
- */
- private val environmentPath by anyOf(File("input/environments/"))
-
- /**
- * The path to where the traces are located.
- */
- private val tracePath by anyOf(File("input/traces/"))
-
- /**
- * The path to where the output results should be written.
- */
- private val outputPath by anyOf(File("output/"))
-
- /**
- * The path to the original VM placements file.
- */
- private val vmPlacements by anyOf(emptyMap<String, String>())
-
- /**
- * The path to the performance interference model.
- */
- private val performanceInterferenceModel by anyOf<PerformanceInterferenceModelReader?>(null)
-
- /**
- * The topology to test.
- */
- public abstract val topology: Topology
-
- /**
- * The workload to test.
- */
- public abstract val workload: Workload
-
- /**
- * The operational phenomenas to consider.
- */
- public abstract val operationalPhenomena: OperationalPhenomena
-
- /**
- * The allocation policies to consider.
- */
- public abstract val allocationPolicy: String
-
- /**
- * A map of trace readers.
- */
- private val traceReaders = ConcurrentHashMap<String, Sc20RawParquetTraceReader>()
-
- /**
- * Perform a single trial for this portfolio.
- */
- @OptIn(ExperimentalCoroutinesApi::class)
- override fun doRun(repeat: Int): Unit = runBlockingSimulation {
- val seeder = Random(repeat.toLong())
- val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt"))
-
- val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = createComputeScheduler(seeder)
-
- val meterProvider: MeterProvider = SdkMeterProvider
- .builder()
- .setClock(clock.toOtelClock())
- .build()
-
- val workload = workload
- val workloadNames = if (workload is CompositeWorkload) {
- workload.workloads.map { it.name }
- } else {
- listOf(workload.name)
- }
-
- val rawReaders = workloadNames.map { workloadName ->
- traceReaders.computeIfAbsent(workloadName) {
- logger.info { "Loading trace $workloadName" }
- Sc20RawParquetTraceReader(File(tracePath, workloadName))
- }
- }
-
- val performanceInterferenceModel = performanceInterferenceModel
- ?.takeIf { operationalPhenomena.hasInterference }
- ?.construct(seeder.asKotlinRandom()) ?: emptyMap()
- val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt())
-
- val monitor = ParquetExperimentMonitor(
- outputPath,
- "portfolio_id=$name/scenario_id=$id/run_id=$repeat",
- 4096
- )
-
- withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
- val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
- logger.debug("ENABLING failures")
- createFailureDomain(
- this,
- clock,
- seeder.nextInt(),
- operationalPhenomena.failureFrequency,
- scheduler,
- chan
- )
- } else {
- null
- }
-
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
- processTrace(
- clock,
- trace,
- scheduler,
- chan,
- monitor
- )
- }
-
- failureDomain?.cancel()
- }
-
- val monitorResults = collectMetrics(meterProvider as MetricProducer)
- logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
- }
-
- /**
- * Create the [ComputeScheduler] instance to use for the trial.
- */
- private fun createComputeScheduler(seeder: Random): ComputeScheduler {
- return when (allocationPolicy) {
- "mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to -1.0)
- )
- "mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to -1.0)
- )
- "core-mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
- )
- "core-mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
- )
- "active-servers" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to -1.0)
- )
- "active-servers-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(InstanceCountWeigher() to 1.0)
- )
- "provisioned-cores" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to -1.0)
- )
- "provisioned-cores-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to 1.0)
- )
- "random" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(RandomWeigher(Random(seeder.nextLong())) to 1.0)
- )
- "replay" -> ReplayScheduler(vmPlacements)
- else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
- }
- }
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
deleted file mode 100644
index b6d3b30c..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import org.opendc.experiments.capelin.model.OperationalPhenomena
-import org.opendc.experiments.capelin.model.Topology
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.harness.dsl.anyOf
-
-/**
- * A [Portfolio] that compares the original VM placements against our policies.
- */
-public class ReplayPortfolio : Portfolio("replay") {
- override val topology: Topology by anyOf(
- Topology("base")
- )
-
- override val workload: Workload by anyOf(
- Workload("solvinity", 1.0)
- )
-
- override val operationalPhenomena: OperationalPhenomena by anyOf(
- OperationalPhenomena(failureFrequency = 0.0, hasInterference = false)
- )
-
- override val allocationPolicy: String by anyOf(
- "replay",
- "active-servers"
- )
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
deleted file mode 100644
index 90840db8..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import org.opendc.experiments.capelin.model.OperationalPhenomena
-import org.opendc.experiments.capelin.model.Topology
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.harness.dsl.anyOf
-
-/**
- * A [Portfolio] to perform a simple test run.
- */
-public class TestPortfolio : Portfolio("test") {
- override val topology: Topology by anyOf(
- Topology("base")
- )
-
- override val workload: Workload by anyOf(
- Workload("solvinity", 1.0)
- )
-
- override val operationalPhenomena: OperationalPhenomena by anyOf(
- OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
- )
-
- override val allocationPolicy: String by anyOf("active-servers")
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt
deleted file mode 100644
index b53b3617..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.model
-
-/**
- * Operation phenomena during experiments.
- *
- * @param failureFrequency The average time between failures in hours.
- * @param hasInterference A flag to enable performance interference between VMs.
- */
-public data class OperationalPhenomena(val failureFrequency: Double, val hasInterference: Boolean)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt
deleted file mode 100644
index fe16a294..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.model
-
-/**
- * The topology topology on which we test the workload.
- */
-public data class Topology(val name: String)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
deleted file mode 100644
index c4ddd158..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.model
-
-public enum class SamplingStrategy {
- REGULAR,
- HPC,
- HPC_LOAD
-}
-
-/**
- * A workload that is considered for a scenario.
- */
-public open class Workload(
- public open val name: String,
- public val fraction: Double,
- public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR
-)
-
-/**
- * A workload that is composed of multiple workloads.
- */
-public class CompositeWorkload(override val name: String, public val workloads: List<Workload>, public val totalLoad: Double) :
- Workload(name, -1.0)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
deleted file mode 100644
index 5f8002e2..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.monitor
-
-import io.opentelemetry.sdk.common.CompletableResultCode
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricExporter
-import org.opendc.compute.service.driver.Host
-import java.time.Clock
-
-/**
- * A [MetricExporter] that exports the metrics to the [ExperimentMonitor].
- */
-public class ExperimentMetricExporter(
- private val monitor: ExperimentMonitor,
- private val clock: Clock,
- private val hosts: Map<String, Host>
-) : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
- reportHostMetrics(metricsByName)
- reportProvisionerMetrics(metricsByName)
- return CompletableResultCode.ofSuccess()
- }
-
- private fun reportHostMetrics(metrics: Map<String, MetricData>) {
- val hostMetrics = mutableMapOf<String, HostMetrics>()
- hosts.mapValuesTo(hostMetrics) { HostMetrics() }
-
- mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v ->
- m.cpuDemand = v
- }
-
- mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v ->
- m.cpuUsage = v
- }
-
- mapDoubleSummary(metrics["power.usage"], hostMetrics) { m, v ->
- m.powerDraw = v
- }
-
- mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v ->
- m.requestedBurst = v.toLong()
- }
-
- mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v ->
- m.grantedBurst = v.toLong()
- }
-
- mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v ->
- m.overcommissionedBurst = v.toLong()
- }
-
- mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v ->
- m.interferedBurst = v.toLong()
- }
-
- mapLongSum(metrics["guests.active"], hostMetrics) { m, v ->
- m.numberOfDeployedImages = v.toInt()
- }
-
- for ((id, hostMetric) in hostMetrics) {
- val host = hosts.getValue(id)
- monitor.reportHostSlice(
- clock.millis(),
- hostMetric.requestedBurst,
- hostMetric.grantedBurst,
- hostMetric.overcommissionedBurst,
- hostMetric.interferedBurst,
- hostMetric.cpuUsage,
- hostMetric.cpuDemand,
- hostMetric.powerDraw,
- hostMetric.numberOfDeployedImages,
- host
- )
- }
- }
-
- private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
- val points = data?.doubleSummaryData?.points ?: emptyList()
- for (point in points) {
- val uid = point.labels["host"]
- val hostMetric = hostMetrics[uid]
-
- if (hostMetric != null) {
- // Take the average of the summary
- val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2
- block(hostMetric, avg)
- }
- }
- }
-
- private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) {
- val points = data?.longSumData?.points ?: emptyList()
- for (point in points) {
- val uid = point.labels["host"]
- val hostMetric = hostMetrics[uid]
-
- if (hostMetric != null) {
- block(hostMetric, point.value)
- }
- }
- }
-
- private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) {
- val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
-
- monitor.reportProvisionerMetrics(
- clock.millis(),
- hosts,
- availableHosts,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- }
-
- private class HostMetrics {
- var requestedBurst: Long = 0
- var grantedBurst: Long = 0
- var overcommissionedBurst: Long = 0
- var interferedBurst: Long = 0
- var cpuUsage: Double = 0.0
- var cpuDemand: Double = 0.0
- var numberOfDeployedImages: Int = 0
- var powerDraw: Double = 0.0
- }
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
deleted file mode 100644
index 68631dee..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.monitor
-
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
-
-/**
- * A monitor watches the events of an experiment.
- */
-public interface ExperimentMonitor : AutoCloseable {
- /**
- * This method is invoked when the state of a VM changes.
- */
- public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
-
- /**
- * This method is invoked when the state of a host changes.
- */
- public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {}
-
- /**
- * This method is invoked for a host for each slice that is finishes.
- */
- public fun reportHostSlice(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- numberOfDeployedImages: Int,
- host: Host
- ) {
- }
-
- /**
- * This method is invoked for a provisioner event.
- */
- public fun reportProvisionerMetrics(
- time: Long,
- totalHostCount: Int,
- availableHostCount: Int,
- totalVmCount: Int,
- activeVmCount: Int,
- inactiveVmCount: Int,
- waitingVmCount: Int,
- failedVmCount: Int
- ) {}
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
deleted file mode 100644
index 983b4cff..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.monitor
-
-import mu.KotlinLogging
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
-import org.opendc.experiments.capelin.telemetry.HostEvent
-import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
-import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter
-import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter
-import java.io.File
-
-/**
- * The logger instance to use.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * An [ExperimentMonitor] that logs the events to a Parquet file.
- */
-public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor {
- private val hostWriter = ParquetHostEventWriter(
- File(base, "host-metrics/$partition/data.parquet"),
- bufferSize
- )
- private val provisionerWriter = ParquetProvisionerEventWriter(
- File(base, "provisioner-metrics/$partition/data.parquet"),
- bufferSize
- )
-
- override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
-
- override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
- logger.debug { "Host ${host.uid} changed state $newState [$time]" }
- }
-
- override fun reportHostSlice(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- numberOfDeployedImages: Int,
- host: Host
- ) {
- hostWriter.write(
- HostEvent(
- time,
- 5 * 60 * 1000L,
- host,
- numberOfDeployedImages,
- requestedBurst,
- grantedBurst,
- overcommissionedBurst,
- interferedBurst,
- cpuUsage,
- cpuDemand,
- powerDraw,
- host.model.cpuCount
- )
- )
- }
-
- override fun reportProvisionerMetrics(
- time: Long,
- totalHostCount: Int,
- availableHostCount: Int,
- totalVmCount: Int,
- activeVmCount: Int,
- inactiveVmCount: Int,
- waitingVmCount: Int,
- failedVmCount: Int
- ) {
- provisionerWriter.write(
- ProvisionerEvent(
- time,
- totalHostCount,
- availableHostCount,
- totalVmCount,
- activeVmCount,
- inactiveVmCount,
- waitingVmCount,
- failedVmCount
- )
- )
- }
-
- override fun close() {
- hostWriter.close()
- provisionerWriter.close()
- }
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt
deleted file mode 100644
index c29e116e..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry
-
-/**
- * An event that occurs within the system.
- */
-public abstract class Event(public val name: String) {
- /**
- * The time of occurrence of this event.
- */
- public abstract val timestamp: Long
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
deleted file mode 100644
index 899fc9b1..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry
-
-import org.opendc.compute.service.driver.Host
-
-/**
- * A periodic report of the host machine metrics.
- */
-public data class HostEvent(
- override val timestamp: Long,
- public val duration: Long,
- public val host: Host,
- public val vmCount: Int,
- public val requestedBurst: Long,
- public val grantedBurst: Long,
- public val overcommissionedBurst: Long,
- public val interferedBurst: Long,
- public val cpuUsage: Double,
- public val cpuDemand: Double,
- public val powerDraw: Double,
- public val cores: Int
-) : Event("host-metrics")
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt
deleted file mode 100644
index 539c9bc9..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry
-
-/**
- * A periodic report of the provisioner's metrics.
- */
-public data class ProvisionerEvent(
- override val timestamp: Long,
- public val totalHostCount: Int,
- public val availableHostCount: Int,
- public val totalVmCount: Int,
- public val activeVmCount: Int,
- public val inactiveVmCount: Int,
- public val waitingVmCount: Int,
- public val failedVmCount: Int
-) : Event("provisioner-metrics")
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt
deleted file mode 100644
index 6c8fc941..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry
-
-import org.opendc.experiments.capelin.Portfolio
-
-/**
- * A periodic report of the host machine metrics.
- */
-public data class RunEvent(
- val portfolio: Portfolio,
- val repeat: Int,
- override val timestamp: Long
-) : Event("run")
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
deleted file mode 100644
index 7631f55f..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry
-
-import org.opendc.compute.api.Server
-
-/**
- * A periodic report of a virtual machine's metrics.
- */
-public data class VmEvent(
- override val timestamp: Long,
- public val duration: Long,
- public val vm: Server,
- public val host: Server,
- public val requestedBurst: Long,
- public val grantedBurst: Long,
- public val overcommissionedBurst: Long,
- public val interferedBurst: Long,
- public val cpuUsage: Double,
- public val cpuDemand: Double
-) : Event("vm-metrics")
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
deleted file mode 100644
index 38930ee5..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry.parquet
-
-import mu.KotlinLogging
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.experiments.capelin.telemetry.Event
-import java.io.Closeable
-import java.io.File
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.BlockingQueue
-import kotlin.concurrent.thread
-
-/**
- * The logging instance to use.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * A writer that writes events in Parquet format.
- */
-public open class ParquetEventWriter<in T : Event>(
- private val path: File,
- private val schema: Schema,
- private val converter: (T, GenericData.Record) -> Unit,
- private val bufferSize: Int = 4096
-) : Runnable, Closeable {
- /**
- * The writer to write the Parquet file.
- */
- private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
-
- /**
- * The queue of commands to process.
- */
- private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize)
-
- /**
- * The thread that is responsible for writing the Parquet records.
- */
- private val writerThread = thread(start = false, name = "parquet-writer") { run() }
-
- /**
- * Write the specified metrics to the database.
- */
- public fun write(event: T) {
- queue.put(Action.Write(event))
- }
-
- /**
- * Signal the writer to stop.
- */
- public override fun close() {
- queue.put(Action.Stop)
- writerThread.join()
- }
-
- init {
- writerThread.start()
- }
-
- /**
- * Start the writer thread.
- */
- override fun run() {
- try {
- loop@ while (true) {
- val action = queue.take()
- when (action) {
- is Action.Stop -> break@loop
- is Action.Write<*> -> {
- val record = GenericData.Record(schema)
- @Suppress("UNCHECKED_CAST")
- converter(action.event as T, record)
- writer.write(record)
- }
- }
- }
- } catch (e: Throwable) {
- logger.error("Writer failed", e)
- } finally {
- writer.close()
- }
- }
-
- public sealed class Action {
- /**
- * A poison pill that will stop the writer thread.
- */
- public object Stop : Action()
-
- /**
- * Write the specified metrics to the database.
- */
- public data class Write<out T : Event>(val event: T) : Action()
- }
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
deleted file mode 100644
index c8fe1cb2..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.HostEvent
-import java.io.File
-
-/**
- * A Parquet event writer for [HostEvent]s.
- */
-public class ParquetHostEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) {
-
- override fun toString(): String = "host-writer"
-
- public companion object {
- private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record ->
- // record.put("portfolio_id", event.run.parent.parent.id)
- // record.put("scenario_id", event.run.parent.id)
- // record.put("run_id", event.run.id)
- record.put("host_id", event.host.name)
- record.put("state", event.host.state.name)
- record.put("timestamp", event.timestamp)
- record.put("duration", event.duration)
- record.put("vm_count", event.vmCount)
- record.put("requested_burst", event.requestedBurst)
- record.put("granted_burst", event.grantedBurst)
- record.put("overcommissioned_burst", event.overcommissionedBurst)
- record.put("interfered_burst", event.interferedBurst)
- record.put("cpu_usage", event.cpuUsage)
- record.put("cpu_demand", event.cpuDemand)
- record.put("power_draw", event.powerDraw)
- record.put("cores", event.cores)
- }
-
- private val schema: Schema = SchemaBuilder
- .record("host_metrics")
- .namespace("org.opendc.experiments.sc20")
- .fields()
- // .name("portfolio_id").type().intType().noDefault()
- // .name("scenario_id").type().intType().noDefault()
- // .name("run_id").type().intType().noDefault()
- .name("timestamp").type().longType().noDefault()
- .name("duration").type().longType().noDefault()
- .name("host_id").type().stringType().noDefault()
- .name("state").type().stringType().noDefault()
- .name("vm_count").type().intType().noDefault()
- .name("requested_burst").type().longType().noDefault()
- .name("granted_burst").type().longType().noDefault()
- .name("overcommissioned_burst").type().longType().noDefault()
- .name("interfered_burst").type().longType().noDefault()
- .name("cpu_usage").type().doubleType().noDefault()
- .name("cpu_demand").type().doubleType().noDefault()
- .name("power_draw").type().doubleType().noDefault()
- .name("cores").type().intType().noDefault()
- .endRecord()
- }
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt
deleted file mode 100644
index 8feff8d9..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
-import java.io.File
-
-/**
- * A Parquet event writer for [ProvisionerEvent]s.
- */
-public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) {
-
- override fun toString(): String = "provisioner-writer"
-
- public companion object {
- private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record ->
- record.put("timestamp", event.timestamp)
- record.put("host_total_count", event.totalHostCount)
- record.put("host_available_count", event.availableHostCount)
- record.put("vm_total_count", event.totalVmCount)
- record.put("vm_active_count", event.activeVmCount)
- record.put("vm_inactive_count", event.inactiveVmCount)
- record.put("vm_waiting_count", event.waitingVmCount)
- record.put("vm_failed_count", event.failedVmCount)
- }
-
- private val schema: Schema = SchemaBuilder
- .record("provisioner_metrics")
- .namespace("org.opendc.experiments.sc20")
- .fields()
- .name("timestamp").type().longType().noDefault()
- .name("host_total_count").type().intType().noDefault()
- .name("host_available_count").type().intType().noDefault()
- .name("vm_total_count").type().intType().noDefault()
- .name("vm_active_count").type().intType().noDefault()
- .name("vm_inactive_count").type().intType().noDefault()
- .name("vm_waiting_count").type().intType().noDefault()
- .name("vm_failed_count").type().intType().noDefault()
- .endRecord()
- }
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt
deleted file mode 100644
index 946410eb..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.RunEvent
-import java.io.File
-
-/**
- * A Parquet event writer for [RunEvent]s.
- */
-public class ParquetRunEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) {
-
- override fun toString(): String = "run-writer"
-
- public companion object {
- private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record ->
- val portfolio = event.portfolio
- record.put("portfolio_name", portfolio.name)
- record.put("scenario_id", portfolio.id)
- record.put("run_id", event.repeat)
- record.put("topology", portfolio.topology.name)
- record.put("workload_name", portfolio.workload.name)
- record.put("workload_fraction", portfolio.workload.fraction)
- record.put("workload_sampler", portfolio.workload.samplingStrategy)
- record.put("allocation_policy", portfolio.allocationPolicy)
- record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency)
- record.put("interference", portfolio.operationalPhenomena.hasInterference)
- record.put("seed", event.repeat)
- }
-
- private val schema: Schema = SchemaBuilder
- .record("runs")
- .namespace("org.opendc.experiments.sc20")
- .fields()
- .name("portfolio_name").type().stringType().noDefault()
- .name("scenario_id").type().intType().noDefault()
- .name("run_id").type().intType().noDefault()
- .name("topology").type().stringType().noDefault()
- .name("workload_name").type().stringType().noDefault()
- .name("workload_fraction").type().doubleType().noDefault()
- .name("workload_sampler").type().stringType().noDefault()
- .name("allocation_policy").type().stringType().noDefault()
- .name("failure_frequency").type().doubleType().noDefault()
- .name("interference").type().booleanType().noDefault()
- .name("seed").type().intType().noDefault()
- .endRecord()
- }
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
deleted file mode 100644
index a8462a51..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.util.TreeSet
-
-/**
- * A [TraceReader] for the internal VM workload trace format.
- *
- * @param reader The internal trace reader to use.
- * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
- * @param run The run to which this reader belongs.
- */
-@OptIn(ExperimentalStdlibApi::class)
-public class Sc20ParquetTraceReader(
- rawReaders: List<Sc20RawParquetTraceReader>,
- performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
- workload: Workload,
- seed: Int
-) : TraceReader<SimWorkload> {
- /**
- * The iterator over the actual trace.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>> =
- rawReaders
- .map { it.read() }
- .run {
- if (workload is CompositeWorkload) {
- this.zip(workload.workloads)
- } else {
- this.zip(listOf(workload))
- }
- }
- .map { sampleWorkload(it.first, workload, it.second, seed) }
- .flatten()
- .run {
- // Apply performance interference model
- if (performanceInterferenceModel.isEmpty())
- this
- else {
- map { entry ->
- val id = entry.name
- val relevantPerformanceInterferenceModelItems =
- performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
-
- entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems))
- }
- }
- }
- .iterator()
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {}
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
deleted file mode 100644
index ffbf46d4..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetReader
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.io.File
-import java.util.UUID
-
-private val logger = KotlinLogging.logger {}
-
-/**
- * A [TraceReader] for the internal VM workload trace format.
- *
- * @param path The directory of the traces.
- */
-@OptIn(ExperimentalStdlibApi::class)
-public class Sc20RawParquetTraceReader(private val path: File) {
- /**
- * Read the fragments into memory.
- */
- private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> {
- @Suppress("DEPRECATION")
- val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet"))
- .disableCompatibility()
- .build()
-
- val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
-
- return try {
- while (true) {
- val record = reader.read() ?: break
-
- val id = record["id"].toString()
- val duration = record["duration"] as Long
- val cores = record["cores"] as Int
- val cpuUsage = record["cpuUsage"] as Double
-
- val fragment = SimTraceWorkload.Fragment(
- duration,
- cpuUsage,
- cores
- )
-
- fragments.getOrPut(id) { mutableListOf() }.add(fragment)
- }
-
- fragments
- } finally {
- reader.close()
- }
- }
-
- /**
- * Read the metadata into a workload.
- */
- private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
- @Suppress("DEPRECATION")
- val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet"))
- .disableCompatibility()
- .build()
-
- var counter = 0
- val entries = mutableListOf<TraceEntry<SimWorkload>>()
-
- return try {
- while (true) {
- val record = metaReader.read() ?: break
-
- val id = record["id"].toString()
- if (!fragments.containsKey(id)) {
- continue
- }
-
- val submissionTime = record["submissionTime"] as Long
- val endTime = record["endTime"] as Long
- val maxCores = record["maxCores"] as Int
- val requiredMemory = record["requiredMemory"] as Long
- val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
-
- val vmFragments = fragments.getValue(id).asSequence()
- val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
- val workload = SimTraceWorkload(vmFragments)
- entries.add(
- TraceEntry(
- uid, id, submissionTime, workload,
- mapOf(
- "submit-time" to submissionTime,
- "end-time" to endTime,
- "total-load" to totalLoad,
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
- )
- )
- }
-
- entries
- } catch (e: Exception) {
- e.printStackTrace()
- throw e
- } finally {
- metaReader.close()
- }
- }
-
- /**
- * The entries in the trace.
- */
- private val entries: List<TraceEntry<SimWorkload>>
-
- init {
- val fragments = parseFragments(path)
- entries = parseMeta(path, fragments)
- }
-
- /**
- * Read the entries in the trace.
- */
- public fun read(): List<TraceEntry<SimWorkload>> = entries
-
- /**
- * Create a [TraceReader] instance.
- */
- public fun createReader(): TraceReader<SimWorkload> {
- return object : TraceReader<SimWorkload>, Iterator<TraceEntry<SimWorkload>> by entries.iterator() {
- override fun close() {}
- }
- }
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
deleted file mode 100644
index c5294b55..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.filter2.compat.FilterCompat
-import org.apache.parquet.filter2.predicate.FilterApi
-import org.apache.parquet.filter2.predicate.Statistics
-import org.apache.parquet.filter2.predicate.UserDefinedPredicate
-import org.apache.parquet.io.api.Binary
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.io.File
-import java.io.Serializable
-import java.util.SortedSet
-import java.util.TreeSet
-import java.util.UUID
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
-import kotlin.random.Random
-
-private val logger = KotlinLogging.logger {}
-
-/**
- * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
- *
- * @param traceFile The directory of the traces.
- * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
- */
-@OptIn(ExperimentalStdlibApi::class)
-public class Sc20StreamingParquetTraceReader(
- traceFile: File,
- performanceInterferenceModel: PerformanceInterferenceModel? = null,
- selectedVms: List<String> = emptyList(),
- random: Random
-) : TraceReader<SimWorkload> {
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>>
-
- /**
- * The intermediate buffer to store the read records in.
- */
- private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024)
-
- /**
- * An optional filter for filtering the selected VMs
- */
- private val filter =
- if (selectedVms.isEmpty())
- null
- else
- FilterCompat.get(
- FilterApi.userDefined(
- FilterApi.binaryColumn("id"),
- SelectedVmFilter(
- TreeSet(selectedVms)
- )
- )
- )
-
- /**
- * A poisonous fragment.
- */
- private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0))
-
- /**
- * The thread to read the records in.
- */
- private val readerThread = thread(start = true, name = "sc20-reader") {
- @Suppress("DEPRECATION")
- val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
- .disableCompatibility()
- .run { if (filter != null) withFilter(filter) else this }
- .build()
-
- try {
- while (true) {
- val record = reader.read()
-
- if (record == null) {
- queue.put(poison)
- break
- }
-
- val id = record["id"].toString()
- val duration = record["duration"] as Long
- val cores = record["cores"] as Int
- val cpuUsage = record["cpuUsage"] as Double
-
- val fragment = SimTraceWorkload.Fragment(
- duration,
- cpuUsage,
- cores
- )
-
- queue.put(id to fragment)
- }
- } catch (e: InterruptedException) {
- // Do not rethrow this
- } finally {
- reader.close()
- }
- }
-
- /**
- * Fill the buffers with the VMs
- */
- private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) {
- if (!hasNext) {
- return
- }
-
- val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>()
- queue.drainTo(fragments)
-
- for ((id, fragment) in fragments) {
- if (id == poison.first) {
- hasNext = false
- return
- }
- buffers[id]?.forEach { it.add(fragment) }
- }
- }
-
- /**
- * A flag to indicate whether the reader has more entries.
- */
- private var hasNext: Boolean = true
-
- /**
- * Initialize the reader.
- */
- init {
- val takenIds = mutableSetOf<UUID>()
- val entries = mutableMapOf<String, GenericData.Record>()
- val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
-
- @Suppress("DEPRECATION")
- val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
- .disableCompatibility()
- .run { if (filter != null) withFilter(filter) else this }
- .build()
-
- while (true) {
- val record = metaReader.read() ?: break
- val id = record["id"].toString()
- entries[id] = record
- }
-
- metaReader.close()
-
- val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms
-
- // Create the entry iterator
- iterator = selection.asSequence()
- .mapNotNull { entries[it] }
- .mapIndexed { index, record ->
- val id = record["id"].toString()
- val submissionTime = record["submissionTime"] as Long
- val endTime = record["endTime"] as Long
- val maxCores = record["maxCores"] as Int
- val requiredMemory = record["requiredMemory"] as Long
- val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray())
-
- assert(uid !in takenIds)
- takenIds += uid
-
- logger.info("Processing VM $id")
-
- val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
- val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
- buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
- val fragments = sequence {
- var time = submissionTime
- repeat@ while (true) {
- if (externalBuffer.isEmpty()) {
- if (hasNext) {
- pull(buffers)
- continue
- } else {
- break
- }
- }
-
- internalBuffer.addAll(externalBuffer)
- externalBuffer.clear()
-
- for (fragment in internalBuffer) {
- yield(fragment)
-
- time += fragment.duration
- if (time >= endTime) {
- break@repeat
- }
- }
-
- internalBuffer.clear()
- }
-
- buffers.remove(id)
- }
- val relevantPerformanceInterferenceModelItems =
- if (performanceInterferenceModel != null)
- PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(),
- Random(random.nextInt())
- )
- else
- null
- val workload = SimTraceWorkload(fragments)
- val meta = mapOf(
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
-
- TraceEntry(
- uid, id, submissionTime, workload,
- if (performanceInterferenceModel != null)
- meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems as Any)
- else
- meta
- )
- }
- .sortedBy { it.start }
- .toList()
- .iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {
- readerThread.interrupt()
- }
-
- private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
- override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
-
- override fun canDrop(statistics: Statistics<Binary>): Boolean {
- val min = statistics.min
- val max = statistics.max
-
- return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty()
- }
-
- override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean {
- val min = statistics.min
- val max = statistics.max
-
- return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
- }
- }
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
deleted file mode 100644
index 7713c06f..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
+++ /dev/null
@@ -1,621 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import com.github.ajalt.clikt.core.CliktCommand
-import com.github.ajalt.clikt.parameters.arguments.argument
-import com.github.ajalt.clikt.parameters.groups.OptionGroup
-import com.github.ajalt.clikt.parameters.groups.groupChoice
-import com.github.ajalt.clikt.parameters.options.convert
-import com.github.ajalt.clikt.parameters.options.default
-import com.github.ajalt.clikt.parameters.options.defaultLazy
-import com.github.ajalt.clikt.parameters.options.option
-import com.github.ajalt.clikt.parameters.options.required
-import com.github.ajalt.clikt.parameters.options.split
-import com.github.ajalt.clikt.parameters.types.file
-import com.github.ajalt.clikt.parameters.types.long
-import me.tongfei.progressbar.ProgressBar
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.format.trace.sc20.Sc20VmPlacementReader
-import java.io.BufferedReader
-import java.io.File
-import java.io.FileReader
-import java.util.Random
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * Represents the command for converting traces
- */
-public class TraceConverterCli : CliktCommand(name = "trace-converter") {
- /**
- * The directory where the trace should be stored.
- */
- private val outputPath by option("-O", "--output", help = "path to store the trace")
- .file(canBeFile = false, mustExist = false)
- .defaultLazy { File("output") }
-
- /**
- * The directory where the input trace is located.
- */
- private val inputPath by argument("input", help = "path to the input trace")
- .file(canBeFile = false)
-
- /**
- * The input type of the trace.
- */
- private val type by option("-t", "--type", help = "input type of trace").groupChoice(
- "solvinity" to SolvinityConversion(),
- "bitbrains" to BitbrainsConversion(),
- "azure" to AzureConversion()
- )
-
- override fun run() {
- val metaSchema = SchemaBuilder
- .record("meta")
- .namespace("org.opendc.format.sc20")
- .fields()
- .name("id").type().stringType().noDefault()
- .name("submissionTime").type().longType().noDefault()
- .name("endTime").type().longType().noDefault()
- .name("maxCores").type().intType().noDefault()
- .name("requiredMemory").type().longType().noDefault()
- .endRecord()
- val schema = SchemaBuilder
- .record("trace")
- .namespace("org.opendc.format.sc20")
- .fields()
- .name("id").type().stringType().noDefault()
- .name("time").type().longType().noDefault()
- .name("duration").type().longType().noDefault()
- .name("cores").type().intType().noDefault()
- .name("cpuUsage").type().doubleType().noDefault()
- .name("flops").type().longType().noDefault()
- .endRecord()
-
- val metaParquet = File(outputPath, "meta.parquet")
- val traceParquet = File(outputPath, "trace.parquet")
-
- if (metaParquet.exists()) {
- metaParquet.delete()
- }
- if (traceParquet.exists()) {
- traceParquet.delete()
- }
-
- val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(metaParquet.toURI()))
- .withSchema(metaSchema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
-
- val writer = AvroParquetWriter.builder<GenericData.Record>(Path(traceParquet.toURI()))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
-
- try {
- val type = type ?: throw IllegalArgumentException("Invalid trace conversion")
- val allFragments = type.read(inputPath, metaSchema, metaWriter)
- allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
-
- for (fragment in allFragments) {
- val record = GenericData.Record(schema)
- record.put("id", fragment.id)
- record.put("time", fragment.tick)
- record.put("duration", fragment.duration)
- record.put("cores", fragment.cores)
- record.put("cpuUsage", fragment.usage)
- record.put("flops", fragment.flops)
-
- writer.write(record)
- }
- } finally {
- writer.close()
- metaWriter.close()
- }
- }
-}
-
-/**
- * The supported trace conversions.
- */
-public sealed class TraceConversion(name: String) : OptionGroup(name) {
- /**
- * Read the fragments of the trace.
- */
- public abstract fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment>
-}
-
-public class SolvinityConversion : TraceConversion("Solvinity") {
- private val clusters by option()
- .split(",")
-
- private val vmPlacements by option("--vm-placements", help = "file containing the VM placements")
- .file(canBeDir = false)
- .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } }
- .required()
-
- override fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment> {
- val clusters = clusters?.toSet() ?: emptySet()
- val timestampCol = 0
- val cpuUsageCol = 1
- val coreCol = 12
- val provisionedMemoryCol = 20
- val traceInterval = 5 * 60 * 1000L
-
- // Identify start time of the entire trace
- var minTimestamp = Long.MAX_VALUE
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach file@{ vmFile ->
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val vmId = vmFile.name
-
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null || !clusters.contains(clusterName)) {
- continue
- }
-
- val values = line.split("\t")
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
-
- if (timestamp < minTimestamp) {
- minTimestamp = timestamp
- }
- return@file
- }
- }
- }
- }
-
- println("Start of trace at $minTimestamp")
-
- val allFragments = mutableListOf<Fragment>()
-
- val begin = 15 * 24 * 60 * 60 * 1000L
- val end = 45 * 24 * 60 * 60 * 1000L
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach { vmFile ->
- println(vmFile)
-
- var vmId = ""
- var maxCores = -1
- var requiredMemory = -1L
- var cores: Int
- var minTime = Long.MAX_VALUE
-
- val flopsFragments = sequence {
- var last: Fragment? = null
-
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split("\t")
-
- vmId = vmFile.name
-
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null || !clusters.contains(clusterName)) {
- continue
- }
-
- val timestamp =
- (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp
- if (begin > timestamp || timestamp > end) {
- continue
- }
-
- cores = values[coreCol].trim().toInt()
- requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
- maxCores = max(maxCores, cores)
- minTime = min(minTime, timestamp)
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
- requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
- maxCores = max(maxCores, cores)
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- last = if (last != null && last!!.flops == 0L && flops == 0L) {
- val oldFragment = last!!
- Fragment(
- vmId,
- oldFragment.tick,
- oldFragment.flops + flops,
- oldFragment.duration + traceInterval,
- cpuUsage,
- cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- cores
- )
- if (last != null) {
- yield(last!!)
- }
- fragment
- }
- }
- }
- }
-
- if (last != null) {
- yield(last!!)
- }
- }
-
- var maxTime = Long.MIN_VALUE
- flopsFragments.filter { it.tick in begin until end }.forEach { fragment ->
- allFragments.add(fragment)
- maxTime = max(maxTime, fragment.tick)
- }
-
- if (minTime in begin until end) {
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", vmId)
- metaRecord.put("submissionTime", minTime)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
- metaWriter.write(metaRecord)
- }
- }
-
- return allFragments
- }
-}
-
-/**
- * Conversion of the Bitbrains public trace.
- */
-public class BitbrainsConversion : TraceConversion("Bitbrains") {
- override fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment> {
- val timestampCol = 0
- val cpuUsageCol = 3
- val coreCol = 1
- val provisionedMemoryCol = 5
- val traceInterval = 5 * 60 * 1000L
-
- val allFragments = mutableListOf<Fragment>()
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach { vmFile ->
- println(vmFile)
-
- var vmId = ""
- var maxCores = -1
- var requiredMemory = -1L
- var cores: Int
- var minTime = Long.MAX_VALUE
-
- val flopsFragments = sequence {
- var last: Fragment? = null
-
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .drop(1)
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split(";\t")
-
- vmId = vmFile.name
-
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
-
- cores = values[coreCol].trim().toInt()
- val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB
- requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong())
- maxCores = max(maxCores, cores)
- minTime = min(minTime, timestamp)
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- last = if (last != null && last!!.flops == 0L && flops == 0L) {
- val oldFragment = last!!
- Fragment(
- vmId,
- oldFragment.tick,
- oldFragment.flops + flops,
- oldFragment.duration + traceInterval,
- cpuUsage,
- cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- cores
- )
- if (last != null) {
- yield(last!!)
- }
- fragment
- }
- }
- }
- }
-
- if (last != null) {
- yield(last!!)
- }
- }
-
- var maxTime = Long.MIN_VALUE
- flopsFragments.forEach { fragment ->
- allFragments.add(fragment)
- maxTime = max(maxTime, fragment.tick)
- }
-
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", vmId)
- metaRecord.put("submissionTime", minTime)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
- metaWriter.write(metaRecord)
- }
-
- return allFragments
- }
-}
-
-/**
- * Conversion of the Azure public VM trace.
- */
-public class AzureConversion : TraceConversion("Azure") {
- private val seed by option(help = "seed for trace sampling")
- .long()
- .default(0)
-
- override fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment> {
- val random = Random(seed)
- val fraction = 0.01
-
- // Read VM table
- val vmIdTableCol = 0
- val coreTableCol = 9
- val provisionedMemoryTableCol = 10
-
- var vmId: String
- var cores: Int
- var requiredMemory: Long
-
- val vmIds = mutableSetOf<String>()
- val vmIdToMetadata = mutableMapOf<String, VmInfo>()
-
- BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader ->
- reader.lineSequence()
- .chunked(1024)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
- // Sample only a fraction of the VMs
- if (random.nextDouble() > fraction) {
- continue
- }
-
- val values = line.split(",")
-
- // Exclude VMs with a large number of cores (not specified exactly)
- if (values[coreTableCol].contains(">")) {
- continue
- }
-
- vmId = values[vmIdTableCol].trim()
- cores = values[coreTableCol].trim().toInt()
- requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB
-
- vmIds.add(vmId)
- vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L)
- }
- }
- }
-
- // Read VM metric reading files
- val timestampCol = 0
- val vmIdCol = 1
- val cpuUsageCol = 4
- val traceInterval = 5 * 60 * 1000L
-
- val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>()
- val vmIdToLastFragment = mutableMapOf<String, Fragment?>()
- val allFragments = mutableListOf<Fragment>()
-
- for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) {
- val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv")
- var timestamp: Long
- var cpuUsage: Double
-
- BufferedReader(FileReader(readingsFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split(",")
- vmId = values[vmIdCol].trim()
-
- // Ignore readings for VMs not in the sample
- if (!vmIds.contains(vmId)) {
- continue
- }
-
- timestamp = values[timestampCol].trim().toLong() * 1000L
- vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp)
- cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz
- vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp)
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
- val lastFragment = vmIdToLastFragment[vmId]
-
- vmIdToLastFragment[vmId] =
- if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) {
- Fragment(
- vmId,
- lastFragment.tick,
- lastFragment.flops + flops,
- lastFragment.duration + traceInterval,
- cpuUsage,
- vmIdToMetadata[vmId]!!.cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- vmIdToMetadata[vmId]!!.cores
- )
- if (lastFragment != null) {
- if (vmIdToFragments[vmId] == null) {
- vmIdToFragments[vmId] = mutableListOf()
- }
- vmIdToFragments[vmId]!!.add(lastFragment)
- allFragments.add(lastFragment)
- }
- fragment
- }
- }
- }
- }
- }
-
- for (entry in vmIdToLastFragment) {
- if (entry.value != null) {
- if (vmIdToFragments[entry.key] == null) {
- vmIdToFragments[entry.key] = mutableListOf()
- }
- vmIdToFragments[entry.key]!!.add(entry.value!!)
- }
- }
-
- println("Read ${vmIdToLastFragment.size} VMs")
-
- for (entry in vmIdToMetadata) {
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", entry.key)
- metaRecord.put("submissionTime", entry.value.minTime)
- metaRecord.put("endTime", entry.value.maxTime)
- println("${entry.value.minTime} - ${entry.value.maxTime}")
- metaRecord.put("maxCores", entry.value.cores)
- metaRecord.put("requiredMemory", entry.value.requiredMemory)
- metaWriter.write(metaRecord)
- }
-
- return allFragments
- }
-}
-
-public data class Fragment(
- public val id: String,
- public val tick: Long,
- public val flops: Long,
- public val duration: Long,
- public val usage: Double,
- public val cores: Int
-)
-
-public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long)
-
-/**
- * A script to convert a trace in text format into a Parquet trace.
- */
-public fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
deleted file mode 100644
index 5c8727ea..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import mu.KotlinLogging
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.SamplingStrategy
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.format.trace.TraceEntry
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.util.*
-import kotlin.random.Random
-
-private val logger = KotlinLogging.logger {}
-
-/**
- * Sample the workload for the specified [run].
- */
-public fun sampleWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- subWorkload: Workload,
- seed: Int
-): List<TraceEntry<SimWorkload>> {
- return when {
- workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed)
- workload.samplingStrategy == SamplingStrategy.HPC ->
- sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false)
- workload.samplingStrategy == SamplingStrategy.HPC_LOAD ->
- sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true)
- else ->
- sampleRegularWorkload(trace, workload, workload, seed)
- }
-}
-
-/**
- * Sample a regular (non-HPC) workload.
- */
-public fun sampleRegularWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- subWorkload: Workload,
- seed: Int
-): List<TraceEntry<SimWorkload>> {
- val fraction = subWorkload.fraction
-
- val shuffled = trace.shuffled(Random(seed))
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- val totalLoad = if (workload is CompositeWorkload) {
- workload.totalLoad
- } else {
- shuffled.sumByDouble { it.meta.getValue("total-load") as Double }
- }
- var currentLoad = 0.0
-
- for (entry in shuffled) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- currentLoad += entryLoad
- res += entry
- }
-
- logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
-}
-
-/**
- * Sample a HPC workload.
- */
-public fun sampleHpcWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- seed: Int,
- sampleOnLoad: Boolean
-): List<TraceEntry<SimWorkload>> {
- val pattern = Regex("^vm__workload__(ComputeNode|cn).*")
- val random = Random(seed)
-
- val fraction = workload.fraction
- val (hpc, nonHpc) = trace.partition { entry ->
- val name = entry.name
- name.matches(pattern)
- }
-
- val hpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- hpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- val nonHpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- nonHpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
-
- val totalLoad = if (workload is CompositeWorkload) {
- workload.totalLoad
- } else {
- trace.sumByDouble { it.meta.getValue("total-load") as Double }
- }
-
- logger.debug { "Total trace load: $totalLoad" }
- var hpcCount = 0
- var hpcLoad = 0.0
- var nonHpcCount = 0
- var nonHpcLoad = 0.0
-
- val res = mutableListOf<TraceEntry<SimWorkload>>()
-
- if (sampleOnLoad) {
- var currentLoad = 0.0
- for (entry in hpcSequence) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- hpcLoad += entryLoad
- hpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
-
- for (entry in nonHpcSequence) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > 1) {
- break
- }
-
- nonHpcLoad += entryLoad
- nonHpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
- } else {
- hpcSequence
- .take((fraction * trace.size).toInt())
- .forEach { entry ->
- hpcLoad += entry.meta.getValue("total-load") as Double
- hpcCount += 1
- res.add(entry)
- }
-
- nonHpcSequence
- .take(((1 - fraction) * trace.size).toInt())
- .forEach { entry ->
- nonHpcLoad += entry.meta.getValue("total-load") as Double
- nonHpcCount += 1
- res.add(entry)
- }
- }
-
- logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
- logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
- logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
-}
-
-/**
- * Sample a random trace entry.
- */
-private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> {
- val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
- return entry.copy(uid = uid)
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
deleted file mode 100644
index d1c01b8e..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
+++ /dev/null
@@ -1,49 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ MIT License
- ~
- ~ Copyright (c) 2020 atlarge-research
- ~
- ~ Permission is hereby granted, free of charge, to any person obtaining a copy
- ~ of this software and associated documentation files (the "Software"), to deal
- ~ in the Software without restriction, including without limitation the rights
- ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- ~ copies of the Software, and to permit persons to whom the Software is
- ~ furnished to do so, subject to the following conditions:
- ~
- ~ The above copyright notice and this permission notice shall be included in all
- ~ copies or substantial portions of the Software.
- ~
- ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- ~ SOFTWARE.
- -->
-
-<Configuration status="WARN">
- <Appenders>
- <Console name="Console" target="SYSTEM_OUT">
- <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/>
- </Console>
- </Appenders>
- <Loggers>
- <Logger name="org.opendc" level="debug" additivity="false">
- <AppenderRef ref="Console"/>
- </Logger>
- <Logger name="org.opendc.experiments.capelin" level="info" additivity="false">
- <AppenderRef ref="Console"/>
- </Logger>
- <Logger name="org.opendc.experiments.capelin.trace" level="debug" additivity="false">
- <AppenderRef ref="Console"/>
- </Logger>
- <Logger name="org.apache.hadoop" level="warn" additivity="false">
- <AppenderRef ref="Console"/>
- </Logger>
- <Root level="error">
- <AppenderRef ref="Console"/>
- </Root>
- </Loggers>
-</Configuration>
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
deleted file mode 100644
index 4cb50ab9..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.Channel
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
-import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.sdk.toOtelClock
-import java.io.File
-
-/**
- * An integration test suite for the SC20 experiments.
- */
-class CapelinIntegrationTest {
- /**
- * The monitor used to keep track of the metrics.
- */
- private lateinit var monitor: TestExperimentReporter
-
- /**
- * Setup the experimental environment.
- */
- @BeforeEach
- fun setUp() {
- monitor = TestExperimentReporter()
- }
-
- @Test
- fun testLarge() = runBlockingSimulation {
- val failures = false
- val seed = 0
- val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
- )
- val traceReader = createTestTraceReader()
- val environmentReader = createTestEnvironmentReader()
- lateinit var monitorResults: ComputeMetrics
-
- val meterProvider: MeterProvider = SdkMeterProvider
- .builder()
- .setClock(clock.toOtelClock())
- .build()
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- val failureDomain = if (failures) {
- println("ENABLING failures")
- createFailureDomain(
- this,
- clock,
- seed,
- 24.0 * 7,
- scheduler,
- chan
- )
- } else {
- null
- }
-
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
- }
-
- failureDomain?.cancel()
- }
-
- monitorResults = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}")
-
- // Note that these values have been verified beforehand
- assertAll(
- { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") },
- { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
- { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
- { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(207389912923, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(207122087280, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
- { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
- )
- }
-
- @Test
- fun testSmall() = runBlockingSimulation {
- val seed = 1
- val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
- )
- val traceReader = createTestTraceReader(0.5, seed)
- val environmentReader = createTestEnvironmentReader("single")
-
- val meterProvider: MeterProvider = SdkMeterProvider
- .builder()
- .setClock(clock.toOtelClock())
- .build()
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
- }
- }
-
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
-
- // Note that these values have been verified beforehand
- assertAll(
- { assertEquals(96350072517, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(96330335057, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
- { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
- )
- }
-
- /**
- * Obtain the trace reader for the test.
- */
- private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
- return Sc20ParquetTraceReader(
- listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))),
- emptyMap(),
- Workload("test", fraction),
- seed
- )
- }
-
- /**
- * Obtain the environment reader for the test.
- */
- private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader {
- val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt")
- return Sc20ClusterEnvironmentReader(stream)
- }
-
- class TestExperimentReporter : ExperimentMonitor {
- var totalRequestedBurst = 0L
- var totalGrantedBurst = 0L
- var totalOvercommissionedBurst = 0L
- var totalInterferedBurst = 0L
-
- override fun reportHostSlice(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- numberOfDeployedImages: Int,
- host: Host,
- ) {
- totalRequestedBurst += requestedBurst
- totalGrantedBurst += grantedBurst
- totalOvercommissionedBurst += overcommissionedBurst
- totalInterferedBurst += interferedBurst
- }
-
- override fun close() {}
- }
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
deleted file mode 100644
index 53b3c2d7..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
-A01;A01;8;3.2;64;1;64;8
-
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt
deleted file mode 100644
index 6b347bff..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
-A01;A01;32;3.2;2048;1;256;32
-B01;B01;48;2.93;1256;6;64;8
-C01;C01;32;3.2;2048;2;128;16
-
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
deleted file mode 100644
index ce7a812c..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
+++ /dev/null
Binary files differ
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
deleted file mode 100644
index 1d7ce882..00000000
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
+++ /dev/null
Binary files differ