summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-25 14:53:54 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-25 14:53:54 +0200
commitaa9b32f8cd1467e9718959f400f6777e5d71737d (patch)
treeb88bbede15108c6855d7f94ded4c7054df186a72 /opendc-experiments/opendc-experiments-capelin
parenteb0e0a3bc557c05a70eead388797ab850ea87366 (diff)
parentb7a71e5b4aa77b41ef41deec2ace42b67a5a13a7 (diff)
merge: Integrate v2.1 progress into public repository
This pull request integrates the changes planned for the v2.1 release of OpenDC into the public Github repository in order to sync the progress of both repositories.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts23
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt83
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/jmh/resources/log4j2.xml37
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt28
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt321
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt18
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt188
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt23
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt172
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt74
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt118
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt43
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt34
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt41
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt126
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt81
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt65
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt72
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt)37
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt121
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt103
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt84
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt157
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt284
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt621
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt199
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt)28
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt339
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json21
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquetbin0 -> 2099 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquetbin0 -> 1125930 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquetbin2148 -> 0 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquetbin1672463 -> 0 bytes
40 files changed, 748 insertions, 2833 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 7c7f0dad..c20556b5 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -26,26 +26,29 @@ description = "Experiments for the Capelin work"
plugins {
`experiment-conventions`
`testing-conventions`
+ `benchmark-conventions`
}
dependencies {
api(platform(projects.opendcPlatform))
api(projects.opendcHarness.opendcHarnessApi)
- implementation(projects.opendcFormat)
+ api(projects.opendcCompute.opendcComputeWorkload)
+
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
- implementation(projects.opendcSimulator.opendcSimulatorFailures)
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(projects.opendcTelemetry.opendcTelemetryCompute)
- implementation(libs.kotlin.logging)
implementation(libs.config)
- implementation(libs.progressbar)
- implementation(libs.clikt)
+ implementation(libs.kotlin.logging)
+ implementation(libs.jackson.databind)
+ implementation(libs.jackson.module.kotlin)
+ implementation(libs.jackson.dataformat.csv)
+ implementation(kotlin("reflect"))
+ implementation(libs.opentelemetry.semconv)
+
+ runtimeOnly(projects.opendcTrace.opendcTraceOpendc)
- implementation(libs.parquet)
- implementation(libs.hadoop.client) {
- exclude(group = "org.slf4j", module = "slf4j-log4j12")
- exclude(group = "log4j")
- }
+ testImplementation(libs.log4j.slf4j)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
new file mode 100644
index 00000000..48a90985
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.workload.*
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.compute.workload.topology.apply
+import org.opendc.experiments.capelin.topology.clusterTopology
+import org.opendc.simulator.core.runBlockingSimulation
+import org.openjdk.jmh.annotations.*
+import java.io.File
+import java.util.*
+import java.util.concurrent.TimeUnit
+
+/**
+ * Benchmark suite for the Capelin experiments.
+ */
+@State(Scope.Thread)
+@Fork(1)
+@Warmup(iterations = 2, time = 5, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
+@OptIn(ExperimentalCoroutinesApi::class)
+class CapelinBenchmarks {
+ private lateinit var vms: List<VirtualMachine>
+ private lateinit var topology: Topology
+
+ @Param("true", "false")
+ private var isOptimized: Boolean = false
+
+ @Setup
+ fun setUp() {
+ val loader = ComputeWorkloadLoader(File("src/test/resources/trace"))
+ val source = trace("bitbrains-small")
+ vms = source.resolve(loader, Random(1L))
+ topology = checkNotNull(object {}.javaClass.getResourceAsStream("/env/topology.txt")).use { clusterTopology(it) }
+ }
+
+ @Benchmark
+ fun benchmarkCapelin() = runBlockingSimulation {
+ val computeScheduler = FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ val runner = ComputeWorkloadRunner(
+ coroutineContext,
+ clock,
+ computeScheduler
+ )
+
+ try {
+ runner.apply(topology, isOptimized)
+ runner.run(vms, 0)
+ } finally {
+ runner.close()
+ }
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/resources/log4j2.xml b/opendc-experiments/opendc-experiments-capelin/src/jmh/resources/log4j2.xml
new file mode 100644
index 00000000..c496dd75
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/resources/log4j2.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ MIT License
+ ~
+ ~ Copyright (c) 2020 atlarge-research
+ ~
+ ~ Permission is hereby granted, free of charge, to any person obtaining a copy
+ ~ of this software and associated documentation files (the "Software"), to deal
+ ~ in the Software without restriction, including without limitation the rights
+ ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ ~ copies of the Software, and to permit persons to whom the Software is
+ ~ furnished to do so, subject to the following conditions:
+ ~
+ ~ The above copyright notice and this permission notice shall be included in all
+ ~ copies or substantial portions of the Software.
+ ~
+ ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ ~ SOFTWARE.
+ -->
+
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="warn">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
index faabe5cb..31e8f961 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
@@ -22,7 +22,8 @@
package org.opendc.experiments.capelin
-import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.compute.workload.composite
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -42,30 +43,25 @@ public class CompositeWorkloadPortfolio : Portfolio("composite-workload") {
)
override val workload: Workload by anyOf(
- CompositeWorkload(
+ Workload(
"all-azure",
- listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.0, trace("azure") to 1.0)
),
- CompositeWorkload(
+ Workload(
"solvinity-25-azure-75",
- listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.25, trace("azure") to 0.75)
),
- CompositeWorkload(
+ Workload(
"solvinity-50-azure-50",
- listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.5, trace("azure") to 0.5)
),
- CompositeWorkload(
+ Workload(
"solvinity-75-azure-25",
- listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.75, trace("azure") to 0.25)
),
- CompositeWorkload(
+ Workload(
"all-solvinity",
- listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 1.0, trace("azure") to 0.0)
)
)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
deleted file mode 100644
index 0fbb7280..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory
-import io.opentelemetry.sdk.metrics.common.InstrumentType
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import io.opentelemetry.sdk.metrics.view.InstrumentSelector
-import io.opentelemetry.sdk.metrics.view.View
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
-import mu.KotlinLogging
-import org.opendc.compute.api.*
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostListener
-import org.opendc.compute.service.driver.HostState
-import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.compute.simulator.SimHost
-import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.failures.CorrelatedFaultInjector
-import org.opendc.simulator.failures.FaultInjector
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
-import org.opendc.telemetry.sdk.toOtelClock
-import java.io.File
-import java.time.Clock
-import kotlin.coroutines.resume
-import kotlin.math.ln
-import kotlin.math.max
-import kotlin.random.Random
-
-/**
- * The logger for this experiment.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * Construct the failure domain for the experiments.
- */
-public fun createFailureDomain(
- coroutineScope: CoroutineScope,
- clock: Clock,
- seed: Int,
- failureInterval: Double,
- service: ComputeService,
- chan: Channel<Unit>
-): CoroutineScope {
- val job = coroutineScope.launch {
- chan.receive()
- val random = Random(seed)
- val injectors = mutableMapOf<String, FaultInjector>()
- for (host in service.hosts) {
- val cluster = host.meta["cluster"] as String
- val injector =
- injectors.getOrPut(cluster) {
- createFaultInjector(
- this,
- clock,
- random,
- failureInterval
- )
- }
- injector.enqueue(host as SimHost)
- }
- }
- return CoroutineScope(coroutineScope.coroutineContext + job)
-}
-
-/**
- * Obtain the [FaultInjector] to use for the experiments.
- */
-public fun createFaultInjector(
- coroutineScope: CoroutineScope,
- clock: Clock,
- random: Random,
- failureInterval: Double
-): FaultInjector {
- // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- // GRID'5000
- return CorrelatedFaultInjector(
- coroutineScope,
- clock,
- iatScale = ln(failureInterval), iatShape = 1.03, // Hours
- sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
- dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
- random = random
- )
-}
-
-/**
- * Create the trace reader from which the VM workloads are read.
- */
-public fun createTraceReader(
- path: File,
- performanceInterferenceModel: PerformanceInterferenceModel,
- vms: List<String>,
- seed: Int
-): Sc20StreamingParquetTraceReader {
- return Sc20StreamingParquetTraceReader(
- path,
- performanceInterferenceModel,
- vms,
- Random(seed)
- )
-}
-
-/**
- * Construct the environment for a simulated compute service..
- */
-public suspend fun withComputeService(
- clock: Clock,
- meterProvider: MeterProvider,
- environmentReader: EnvironmentReader,
- scheduler: ComputeScheduler,
- block: suspend CoroutineScope.(ComputeService) -> Unit
-): Unit = coroutineScope {
- val hosts = environmentReader
- .use { it.read() }
- .map { def ->
- SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- coroutineContext,
- clock,
- meterProvider.get("opendc-compute-simulator"),
- SimFairShareHypervisorProvider(),
- def.powerModel
- )
- }
-
- val serviceMeter = meterProvider.get("opendc-compute")
- val service =
- ComputeService(coroutineContext, clock, serviceMeter, scheduler)
-
- for (host in hosts) {
- service.addHost(host)
- }
-
- try {
- block(this, service)
- } finally {
- service.close()
- hosts.forEach(SimHost::close)
- }
-}
-
-/**
- * Attach the specified monitor to the VM provisioner.
- */
-@OptIn(ExperimentalCoroutinesApi::class)
-public suspend fun withMonitor(
- monitor: ExperimentMonitor,
- clock: Clock,
- metricProducer: MetricProducer,
- scheduler: ComputeService,
- block: suspend CoroutineScope.() -> Unit
-): Unit = coroutineScope {
- val monitorJobs = mutableSetOf<Job>()
-
- // Monitor host events
- for (host in scheduler.hosts) {
- monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
- host.addListener(object : HostListener {
- override fun onStateChanged(host: Host, newState: HostState) {
- monitor.reportHostStateChange(clock.millis(), host, newState)
- }
- })
- }
-
- val reader = CoroutineMetricReader(
- this,
- listOf(metricProducer),
- ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }),
- exportInterval = 5 * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */
- )
-
- try {
- block(this)
- } finally {
- monitorJobs.forEach(Job::cancel)
- reader.close()
- monitor.close()
- }
-}
-
-public class ComputeMetrics {
- public var submittedVms: Int = 0
- public var queuedVms: Int = 0
- public var runningVms: Int = 0
- public var unscheduledVms: Int = 0
- public var finishedVms: Int = 0
-}
-
-/**
- * Collect the metrics of the compute service.
- */
-public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
- val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
- val res = ComputeMetrics()
- try {
- // Hack to extract metrics from OpenTelemetry SDK
- res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- } catch (cause: Throwable) {
- logger.warn(cause) { "Failed to collect metrics" }
- }
- return res
-}
-
-/**
- * Process the trace.
- */
-public suspend fun processTrace(
- clock: Clock,
- reader: TraceReader<SimWorkload>,
- scheduler: ComputeService,
- chan: Channel<Unit>,
- monitor: ExperimentMonitor
-) {
- val client = scheduler.newClient()
- val image = client.newImage("vm-image")
- var offset = Long.MIN_VALUE
- try {
- coroutineScope {
- while (reader.hasNext()) {
- val entry = reader.next()
-
- if (offset < 0) {
- offset = entry.start - clock.millis()
- }
-
- delay(max(0, (entry.start - offset) - clock.millis()))
- launch {
- chan.send(Unit)
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace)
- val server = client.newServer(
- entry.name,
- image,
- client.newFlavor(
- entry.name,
- entry.meta["cores"] as Int,
- entry.meta["required-memory"] as Long
- ),
- meta = entry.meta + mapOf("workload" to workload)
- )
-
- suspendCancellableCoroutine { cont ->
- server.watch(object : ServerWatcher {
- override fun onStateChanged(server: Server, newState: ServerState) {
- monitor.reportVmStateChange(clock.millis(), server, newState)
-
- if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) {
- cont.resume(Unit)
- }
- }
- })
- }
- }
- }
- }
-
- yield()
- } finally {
- reader.close()
- client.close()
- }
-}
-
-/**
- * Create a [MeterProvider] instance for the experiment.
- */
-public fun createMeterProvider(clock: Clock): MeterProvider {
- val powerSelector = InstrumentSelector.builder()
- .setInstrumentNameRegex("power\\.usage")
- .setInstrumentType(InstrumentType.VALUE_RECORDER)
- .build()
- val powerView = View.builder()
- .setAggregatorFactory(AggregatorFactory.lastValue())
- .build()
-
- return SdkMeterProvider
- .builder()
- .setClock(clock.toOtelClock())
- .registerView(powerSelector, powerView)
- .build()
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
index e1cf8517..cd093e6c 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -44,10 +46,10 @@ public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
index a995e467..73e59a58 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
@@ -22,8 +22,10 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByHpc
+import org.opendc.compute.workload.sampleByHpcLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
-import org.opendc.experiments.capelin.model.SamplingStrategy
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
import org.opendc.harness.dsl.anyOf
@@ -40,13 +42,13 @@ public class MoreHpcPortfolio : Portfolio("more_hpc") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD),
- Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD),
- Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD)
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.0)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(1.0)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
index 49559e0e..9d5717bb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -40,10 +42,10 @@ public class MoreVelocityPortfolio : Portfolio("more_velocity") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
index 1aac4f9e..7ab586b3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,10 +38,10 @@ public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena")
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index b70eefb2..4e855f82 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -23,38 +23,35 @@
package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
-import org.opendc.compute.service.scheduler.*
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.*
-import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.ComputeWorkloadRunner
+import org.opendc.compute.workload.createComputeScheduler
+import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter
+import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.topology.apply
+import org.opendc.compute.workload.util.PerformanceInterferenceReader
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
-import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
-import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import org.opendc.format.trace.PerformanceInterferenceModelReader
+import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
+import java.time.Duration
import java.util.*
-import java.util.concurrent.ConcurrentHashMap
-import kotlin.random.asKotlinRandom
+import kotlin.math.roundToLong
/**
* A portfolio represents a collection of scenarios are tested for the work.
*
* @param name The name of the portfolio.
*/
-public abstract class Portfolio(name: String) : Experiment(name) {
+abstract class Portfolio(name: String) : Experiment(name) {
/**
* The logger for this portfolio instance.
*/
@@ -71,147 +68,84 @@ public abstract class Portfolio(name: String) : Experiment(name) {
private val vmPlacements by anyOf(emptyMap<String, String>())
/**
- * The path to the performance interference model.
- */
- private val performanceInterferenceModel by anyOf<PerformanceInterferenceModelReader?>(null)
-
- /**
* The topology to test.
*/
- public abstract val topology: Topology
+ abstract val topology: Topology
/**
* The workload to test.
*/
- public abstract val workload: Workload
+ abstract val workload: Workload
/**
* The operational phenomenas to consider.
*/
- public abstract val operationalPhenomena: OperationalPhenomena
+ abstract val operationalPhenomena: OperationalPhenomena
/**
* The allocation policies to consider.
*/
- public abstract val allocationPolicy: String
+ abstract val allocationPolicy: String
/**
- * A map of trace readers.
+ * A helper class to load workload traces.
*/
- private val traceReaders = ConcurrentHashMap<String, Sc20RawParquetTraceReader>()
+ private val workloadLoader = ComputeWorkloadLoader(File(config.getString("trace-path")))
/**
* Perform a single trial for this portfolio.
*/
- @OptIn(ExperimentalCoroutinesApi::class)
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
- val environment = Sc20ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
-
- val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = createComputeScheduler(seeder)
-
- val meterProvider = createMeterProvider(clock)
- val workload = workload
- val workloadNames = if (workload is CompositeWorkload) {
- workload.workloads.map { it.name }
- } else {
- listOf(workload.name)
- }
-
- val rawReaders = workloadNames.map { workloadName ->
- traceReaders.computeIfAbsent(workloadName) {
- logger.info { "Loading trace $workloadName" }
- Sc20RawParquetTraceReader(File(config.getString("trace-path"), workloadName))
- }
- }
- val performanceInterferenceModel = performanceInterferenceModel
- ?.takeIf { operationalPhenomena.hasInterference }
- ?.construct(seeder.asKotlinRandom()) ?: emptyMap()
- val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt())
+ val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
+ PerformanceInterferenceReader()
+ .read(File(config.getString("interference-model")))
+ .let { VmInterferenceModel(it, Random(seeder.nextLong())) }
+ else
+ null
+
+ val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements)
+ val failureModel =
+ if (operationalPhenomena.failureFrequency > 0)
+ grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()))
+ else
+ null
+ val runner = ComputeWorkloadRunner(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ failureModel,
+ performanceInterferenceModel
+ )
- val monitor = ParquetExperimentMonitor(
+ val exporter = ParquetComputeMetricExporter(
File(config.getString("output-path")),
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
)
-
- withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
- val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
- logger.debug("ENABLING failures")
- createFailureDomain(
- this,
- clock,
- seeder.nextInt(),
- operationalPhenomena.failureFrequency,
- scheduler,
- chan
- )
- } else {
- null
- }
-
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
- processTrace(
- clock,
- trace,
- scheduler,
- chan,
- monitor
- )
- }
-
- failureDomain?.cancel()
+ val metricReader = CoroutineMetricReader(this, runner.producers, exporter)
+ val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt"))
+
+ try {
+ // Instantiate the desired topology
+ runner.apply(topology)
+
+ // Converge the workload trace
+ runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong())
+ } finally {
+ runner.close()
+ metricReader.close()
}
- val monitorResults = collectMetrics(meterProvider as MetricProducer)
- logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
- }
-
- /**
- * Create the [ComputeScheduler] instance to use for the trial.
- */
- private fun createComputeScheduler(seeder: Random): ComputeScheduler {
- return when (allocationPolicy) {
- "mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to -1.0)
- )
- "mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to -1.0)
- )
- "core-mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
- )
- "core-mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
- )
- "active-servers" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to -1.0)
- )
- "active-servers-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(InstanceCountWeigher() to 1.0)
- )
- "provisioned-cores" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to -1.0)
- )
- "provisioned-cores-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to 1.0)
- )
- "random" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(RandomWeigher(Random(seeder.nextLong())) to 1.0)
- )
- "replay" -> ReplayScheduler(vmPlacements)
- else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
+ val monitorResults = collectServiceMetrics(runner.producers[0])
+ logger.debug {
+ "Scheduler " +
+ "Success=${monitorResults.attemptsSuccess} " +
+ "Failure=${monitorResults.attemptsFailure} " +
+ "Error=${monitorResults.attemptsError} " +
+ "Pending=${monitorResults.serversPending} " +
+ "Active=${monitorResults.serversActive}"
}
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
index b6d3b30c..17ec48d4 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,7 +37,7 @@ public class ReplayPortfolio : Portfolio("replay") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity"))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
index 90840db8..98eb989d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,7 +37,7 @@ public class TestPortfolio : Portfolio("test") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity"))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
index c4ddd158..a2e71243 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
@@ -22,23 +22,12 @@
package org.opendc.experiments.capelin.model
-public enum class SamplingStrategy {
- REGULAR,
- HPC,
- HPC_LOAD
-}
+import org.opendc.compute.workload.ComputeWorkload
/**
- * A workload that is considered for a scenario.
- */
-public open class Workload(
- public open val name: String,
- public val fraction: Double,
- public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR
-)
-
-/**
- * A workload that is composed of multiple workloads.
+ * A single workload originating from a trace.
+ *
+ * @param name the name of the workload.
+ * @param source The source of the workload data.
*/
-public class CompositeWorkload(override val name: String, public val workloads: List<Workload>, public val totalLoad: Double) :
- Workload(name, -1.0)
+data class Workload(val name: String, val source: ComputeWorkload)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
deleted file mode 100644
index 54ab3b5b..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.monitor
-
-import io.opentelemetry.sdk.common.CompletableResultCode
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricExporter
-import org.opendc.compute.service.driver.Host
-import java.time.Clock
-
-/**
- * A [MetricExporter] that exports the metrics to the [ExperimentMonitor].
- */
-public class ExperimentMetricExporter(
- private val monitor: ExperimentMonitor,
- private val clock: Clock,
- private val hosts: Map<String, Host>
-) : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
- reportHostMetrics(metricsByName)
- reportProvisionerMetrics(metricsByName)
- return CompletableResultCode.ofSuccess()
- }
-
- private fun reportHostMetrics(metrics: Map<String, MetricData>) {
- val hostMetrics = mutableMapOf<String, HostMetrics>()
- hosts.mapValuesTo(hostMetrics) { HostMetrics() }
-
- mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v ->
- m.cpuDemand = v
- }
-
- mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v ->
- m.cpuUsage = v
- }
-
- mapDoubleGauge(metrics["power.usage"], hostMetrics) { m, v ->
- m.powerDraw = v
- }
-
- mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v ->
- m.requestedBurst = v.toLong()
- }
-
- mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v ->
- m.grantedBurst = v.toLong()
- }
-
- mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v ->
- m.overcommissionedBurst = v.toLong()
- }
-
- mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v ->
- m.interferedBurst = v.toLong()
- }
-
- mapLongSum(metrics["guests.active"], hostMetrics) { m, v ->
- m.numberOfDeployedImages = v.toInt()
- }
-
- for ((id, hostMetric) in hostMetrics) {
- val host = hosts.getValue(id)
- monitor.reportHostSlice(
- clock.millis(),
- hostMetric.requestedBurst,
- hostMetric.grantedBurst,
- hostMetric.overcommissionedBurst,
- hostMetric.interferedBurst,
- hostMetric.cpuUsage,
- hostMetric.cpuDemand,
- hostMetric.powerDraw,
- hostMetric.numberOfDeployedImages,
- host
- )
- }
- }
-
- private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
- val points = data?.doubleSummaryData?.points ?: emptyList()
- for (point in points) {
- val uid = point.labels["host"]
- val hostMetric = hostMetrics[uid]
-
- if (hostMetric != null) {
- // Take the average of the summary
- val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2
- block(hostMetric, avg)
- }
- }
- }
-
- private fun mapDoubleGauge(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
- val points = data?.doubleGaugeData?.points ?: emptyList()
- for (point in points) {
- val uid = point.labels["host"]
- val hostMetric = hostMetrics[uid]
-
- if (hostMetric != null) {
- block(hostMetric, point.value)
- }
- }
- }
-
- private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) {
- val points = data?.longSumData?.points ?: emptyList()
- for (point in points) {
- val uid = point.labels["host"]
- val hostMetric = hostMetrics[uid]
-
- if (hostMetric != null) {
- block(hostMetric, point.value)
- }
- }
- }
-
- private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) {
- val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
-
- monitor.reportProvisionerMetrics(
- clock.millis(),
- hosts,
- availableHosts,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- }
-
- private class HostMetrics {
- var requestedBurst: Long = 0
- var grantedBurst: Long = 0
- var overcommissionedBurst: Long = 0
- var interferedBurst: Long = 0
- var cpuUsage: Double = 0.0
- var cpuDemand: Double = 0.0
- var numberOfDeployedImages: Int = 0
- var powerDraw: Double = 0.0
- }
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
deleted file mode 100644
index 68631dee..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.monitor
-
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
-
-/**
- * A monitor watches the events of an experiment.
- */
-public interface ExperimentMonitor : AutoCloseable {
- /**
- * This method is invoked when the state of a VM changes.
- */
- public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
-
- /**
- * This method is invoked when the state of a host changes.
- */
- public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {}
-
- /**
- * This method is invoked for a host for each slice that is finishes.
- */
- public fun reportHostSlice(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- numberOfDeployedImages: Int,
- host: Host
- ) {
- }
-
- /**
- * This method is invoked for a provisioner event.
- */
- public fun reportProvisionerMetrics(
- time: Long,
- totalHostCount: Int,
- availableHostCount: Int,
- totalVmCount: Int,
- activeVmCount: Int,
- inactiveVmCount: Int,
- waitingVmCount: Int,
- failedVmCount: Int
- ) {}
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
deleted file mode 100644
index 983b4cff..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.monitor
-
-import mu.KotlinLogging
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
-import org.opendc.experiments.capelin.telemetry.HostEvent
-import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
-import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter
-import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter
-import java.io.File
-
-/**
- * The logger instance to use.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * An [ExperimentMonitor] that logs the events to a Parquet file.
- */
-public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor {
- private val hostWriter = ParquetHostEventWriter(
- File(base, "host-metrics/$partition/data.parquet"),
- bufferSize
- )
- private val provisionerWriter = ParquetProvisionerEventWriter(
- File(base, "provisioner-metrics/$partition/data.parquet"),
- bufferSize
- )
-
- override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
-
- override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
- logger.debug { "Host ${host.uid} changed state $newState [$time]" }
- }
-
- override fun reportHostSlice(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- numberOfDeployedImages: Int,
- host: Host
- ) {
- hostWriter.write(
- HostEvent(
- time,
- 5 * 60 * 1000L,
- host,
- numberOfDeployedImages,
- requestedBurst,
- grantedBurst,
- overcommissionedBurst,
- interferedBurst,
- cpuUsage,
- cpuDemand,
- powerDraw,
- host.model.cpuCount
- )
- )
- }
-
- override fun reportProvisionerMetrics(
- time: Long,
- totalHostCount: Int,
- availableHostCount: Int,
- totalVmCount: Int,
- activeVmCount: Int,
- inactiveVmCount: Int,
- waitingVmCount: Int,
- failedVmCount: Int
- ) {
- provisionerWriter.write(
- ProvisionerEvent(
- time,
- totalHostCount,
- availableHostCount,
- totalVmCount,
- activeVmCount,
- inactiveVmCount,
- waitingVmCount,
- failedVmCount
- )
- )
- }
-
- override fun close() {
- hostWriter.close()
- provisionerWriter.close()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
deleted file mode 100644
index 899fc9b1..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry
-
-import org.opendc.compute.service.driver.Host
-
-/**
- * A periodic report of the host machine metrics.
- */
-public data class HostEvent(
- override val timestamp: Long,
- public val duration: Long,
- public val host: Host,
- public val vmCount: Int,
- public val requestedBurst: Long,
- public val grantedBurst: Long,
- public val overcommissionedBurst: Long,
- public val interferedBurst: Long,
- public val cpuUsage: Double,
- public val cpuDemand: Double,
- public val powerDraw: Double,
- public val cores: Int
-) : Event("host-metrics")
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt
deleted file mode 100644
index 6c8fc941..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry
-
-import org.opendc.experiments.capelin.Portfolio
-
-/**
- * A periodic report of the host machine metrics.
- */
-public data class RunEvent(
- val portfolio: Portfolio,
- val repeat: Int,
- override val timestamp: Long
-) : Event("run")
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
deleted file mode 100644
index 7631f55f..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry
-
-import org.opendc.compute.api.Server
-
-/**
- * A periodic report of a virtual machine's metrics.
- */
-public data class VmEvent(
- override val timestamp: Long,
- public val duration: Long,
- public val vm: Server,
- public val host: Server,
- public val requestedBurst: Long,
- public val grantedBurst: Long,
- public val overcommissionedBurst: Long,
- public val interferedBurst: Long,
- public val cpuUsage: Double,
- public val cpuDemand: Double
-) : Event("vm-metrics")
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
deleted file mode 100644
index 38930ee5..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry.parquet
-
-import mu.KotlinLogging
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.experiments.capelin.telemetry.Event
-import java.io.Closeable
-import java.io.File
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.BlockingQueue
-import kotlin.concurrent.thread
-
-/**
- * The logging instance to use.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * A writer that writes events in Parquet format.
- */
-public open class ParquetEventWriter<in T : Event>(
- private val path: File,
- private val schema: Schema,
- private val converter: (T, GenericData.Record) -> Unit,
- private val bufferSize: Int = 4096
-) : Runnable, Closeable {
- /**
- * The writer to write the Parquet file.
- */
- private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
-
- /**
- * The queue of commands to process.
- */
- private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize)
-
- /**
- * The thread that is responsible for writing the Parquet records.
- */
- private val writerThread = thread(start = false, name = "parquet-writer") { run() }
-
- /**
- * Write the specified metrics to the database.
- */
- public fun write(event: T) {
- queue.put(Action.Write(event))
- }
-
- /**
- * Signal the writer to stop.
- */
- public override fun close() {
- queue.put(Action.Stop)
- writerThread.join()
- }
-
- init {
- writerThread.start()
- }
-
- /**
- * Start the writer thread.
- */
- override fun run() {
- try {
- loop@ while (true) {
- val action = queue.take()
- when (action) {
- is Action.Stop -> break@loop
- is Action.Write<*> -> {
- val record = GenericData.Record(schema)
- @Suppress("UNCHECKED_CAST")
- converter(action.event as T, record)
- writer.write(record)
- }
- }
- }
- } catch (e: Throwable) {
- logger.error("Writer failed", e)
- } finally {
- writer.close()
- }
- }
-
- public sealed class Action {
- /**
- * A poison pill that will stop the writer thread.
- */
- public object Stop : Action()
-
- /**
- * Write the specified metrics to the database.
- */
- public data class Write<out T : Event>(val event: T) : Action()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
deleted file mode 100644
index c8fe1cb2..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.HostEvent
-import java.io.File
-
-/**
- * A Parquet event writer for [HostEvent]s.
- */
-public class ParquetHostEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) {
-
- override fun toString(): String = "host-writer"
-
- public companion object {
- private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record ->
- // record.put("portfolio_id", event.run.parent.parent.id)
- // record.put("scenario_id", event.run.parent.id)
- // record.put("run_id", event.run.id)
- record.put("host_id", event.host.name)
- record.put("state", event.host.state.name)
- record.put("timestamp", event.timestamp)
- record.put("duration", event.duration)
- record.put("vm_count", event.vmCount)
- record.put("requested_burst", event.requestedBurst)
- record.put("granted_burst", event.grantedBurst)
- record.put("overcommissioned_burst", event.overcommissionedBurst)
- record.put("interfered_burst", event.interferedBurst)
- record.put("cpu_usage", event.cpuUsage)
- record.put("cpu_demand", event.cpuDemand)
- record.put("power_draw", event.powerDraw)
- record.put("cores", event.cores)
- }
-
- private val schema: Schema = SchemaBuilder
- .record("host_metrics")
- .namespace("org.opendc.experiments.sc20")
- .fields()
- // .name("portfolio_id").type().intType().noDefault()
- // .name("scenario_id").type().intType().noDefault()
- // .name("run_id").type().intType().noDefault()
- .name("timestamp").type().longType().noDefault()
- .name("duration").type().longType().noDefault()
- .name("host_id").type().stringType().noDefault()
- .name("state").type().stringType().noDefault()
- .name("vm_count").type().intType().noDefault()
- .name("requested_burst").type().longType().noDefault()
- .name("granted_burst").type().longType().noDefault()
- .name("overcommissioned_burst").type().longType().noDefault()
- .name("interfered_burst").type().longType().noDefault()
- .name("cpu_usage").type().doubleType().noDefault()
- .name("cpu_demand").type().doubleType().noDefault()
- .name("power_draw").type().doubleType().noDefault()
- .name("cores").type().intType().noDefault()
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt
deleted file mode 100644
index 8feff8d9..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
-import java.io.File
-
-/**
- * A Parquet event writer for [ProvisionerEvent]s.
- */
-public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) {
-
- override fun toString(): String = "provisioner-writer"
-
- public companion object {
- private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record ->
- record.put("timestamp", event.timestamp)
- record.put("host_total_count", event.totalHostCount)
- record.put("host_available_count", event.availableHostCount)
- record.put("vm_total_count", event.totalVmCount)
- record.put("vm_active_count", event.activeVmCount)
- record.put("vm_inactive_count", event.inactiveVmCount)
- record.put("vm_waiting_count", event.waitingVmCount)
- record.put("vm_failed_count", event.failedVmCount)
- }
-
- private val schema: Schema = SchemaBuilder
- .record("provisioner_metrics")
- .namespace("org.opendc.experiments.sc20")
- .fields()
- .name("timestamp").type().longType().noDefault()
- .name("host_total_count").type().intType().noDefault()
- .name("host_available_count").type().intType().noDefault()
- .name("vm_total_count").type().intType().noDefault()
- .name("vm_active_count").type().intType().noDefault()
- .name("vm_inactive_count").type().intType().noDefault()
- .name("vm_waiting_count").type().intType().noDefault()
- .name("vm_failed_count").type().intType().noDefault()
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt
deleted file mode 100644
index 946410eb..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.RunEvent
-import java.io.File
-
-/**
- * A Parquet event writer for [RunEvent]s.
- */
-public class ParquetRunEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) {
-
- override fun toString(): String = "run-writer"
-
- public companion object {
- private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record ->
- val portfolio = event.portfolio
- record.put("portfolio_name", portfolio.name)
- record.put("scenario_id", portfolio.id)
- record.put("run_id", event.repeat)
- record.put("topology", portfolio.topology.name)
- record.put("workload_name", portfolio.workload.name)
- record.put("workload_fraction", portfolio.workload.fraction)
- record.put("workload_sampler", portfolio.workload.samplingStrategy)
- record.put("allocation_policy", portfolio.allocationPolicy)
- record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency)
- record.put("interference", portfolio.operationalPhenomena.hasInterference)
- record.put("seed", event.repeat)
- }
-
- private val schema: Schema = SchemaBuilder
- .record("runs")
- .namespace("org.opendc.experiments.sc20")
- .fields()
- .name("portfolio_name").type().stringType().noDefault()
- .name("scenario_id").type().intType().noDefault()
- .name("run_id").type().intType().noDefault()
- .name("topology").type().stringType().noDefault()
- .name("workload_name").type().stringType().noDefault()
- .name("workload_fraction").type().doubleType().noDefault()
- .name("workload_sampler").type().stringType().noDefault()
- .name("allocation_policy").type().stringType().noDefault()
- .name("failure_frequency").type().doubleType().noDefault()
- .name("interference").type().booleanType().noDefault()
- .name("seed").type().intType().noDefault()
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt
index 539c9bc9..b8b65d28 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,18 +20,27 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry
+package org.opendc.experiments.capelin.topology
/**
- * A periodic report of the provisioner's metrics.
+ * Definition of a compute cluster modeled in the simulation.
+ *
+ * @param id A unique identifier representing the compute cluster.
+ * @param name The name of the cluster.
+ * @param cpuCount The total number of CPUs in the cluster.
+ * @param cpuSpeed The speed of a CPU in the cluster in MHz.
+ * @param memCapacity The total memory capacity of the cluster (in MiB).
+ * @param hostCount The number of hosts in the cluster.
+ * @param memCapacityPerHost The memory capacity per host in the cluster (MiB).
+ * @param cpuCountPerHost The number of CPUs per host in the cluster.
*/
-public data class ProvisionerEvent(
- override val timestamp: Long,
- public val totalHostCount: Int,
- public val availableHostCount: Int,
- public val totalVmCount: Int,
- public val activeVmCount: Int,
- public val inactiveVmCount: Int,
- public val waitingVmCount: Int,
- public val failedVmCount: Int
-) : Event("provisioner-metrics")
+public data class ClusterSpec(
+ val id: String,
+ val name: String,
+ val cpuCount: Int,
+ val cpuSpeed: Double,
+ val memCapacity: Double,
+ val hostCount: Int,
+ val memCapacityPerHost: Double,
+ val cpuCountPerHost: Int
+)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt
new file mode 100644
index 00000000..5a175f2c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.topology
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.databind.MappingIterator
+import com.fasterxml.jackson.databind.ObjectReader
+import com.fasterxml.jackson.dataformat.csv.CsvMapper
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import java.io.File
+import java.io.InputStream
+
+/**
+ * A helper class for reading a cluster specification file.
+ */
+class ClusterSpecReader {
+ /**
+ * The [CsvMapper] to map the environment file to an object.
+ */
+ private val mapper = CsvMapper()
+
+ /**
+ * The [ObjectReader] to convert the lines into objects.
+ */
+ private val reader: ObjectReader = mapper.readerFor(Entry::class.java).with(schema)
+
+ /**
+ * Read the specified [file].
+ */
+ fun read(file: File): List<ClusterSpec> {
+ return reader.readValues<Entry>(file).use { read(it) }
+ }
+
+ /**
+ * Read the specified [input].
+ */
+ fun read(input: InputStream): List<ClusterSpec> {
+ return reader.readValues<Entry>(input).use { read(it) }
+ }
+
+ /**
+ * Convert the specified [MappingIterator] into a list of [ClusterSpec]s.
+ */
+ private fun read(it: MappingIterator<Entry>): List<ClusterSpec> {
+ val result = mutableListOf<ClusterSpec>()
+
+ for (entry in it) {
+ val def = ClusterSpec(
+ entry.id,
+ entry.name,
+ entry.cpuCount,
+ entry.cpuSpeed * 1000, // Convert to MHz
+ entry.memCapacity * 1000, // Convert to MiB
+ entry.hostCount,
+ entry.memCapacityPerHost * 1000,
+ entry.cpuCountPerHost
+ )
+ result.add(def)
+ }
+
+ return result
+ }
+
+ private open class Entry(
+ @JsonProperty("ClusterID")
+ val id: String,
+ @JsonProperty("ClusterName")
+ val name: String,
+ @JsonProperty("Cores")
+ val cpuCount: Int,
+ @JsonProperty("Speed")
+ val cpuSpeed: Double,
+ @JsonProperty("Memory")
+ val memCapacity: Double,
+ @JsonProperty("numberOfHosts")
+ val hostCount: Int,
+ @JsonProperty("memoryCapacityPerHost")
+ val memCapacityPerHost: Double,
+ @JsonProperty("coreCountPerHost")
+ val cpuCountPerHost: Int
+ )
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("ClusterID", CsvSchema.ColumnType.STRING)
+ .addColumn("ClusterName", CsvSchema.ColumnType.STRING)
+ .addColumn("Cores", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Speed", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory", CsvSchema.ColumnType.NUMBER)
+ .addColumn("numberOfHosts", CsvSchema.ColumnType.NUMBER)
+ .addColumn("memoryCapacityPerHost", CsvSchema.ColumnType.NUMBER)
+ .addColumn("coreCountPerHost", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .setColumnSeparator(';')
+ .setUseHeader(true)
+ .build()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
new file mode 100644
index 00000000..5ab4261a
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TopologyFactories")
+package org.opendc.experiments.capelin.topology
+
+import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.LinearPowerModel
+import org.opendc.simulator.compute.power.PowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import java.io.File
+import java.io.InputStream
+import java.util.*
+import kotlin.math.roundToLong
+
+/**
+ * A [ClusterSpecReader] that is used to read the cluster definition file.
+ */
+private val reader = ClusterSpecReader()
+
+/**
+ * Construct a [Topology] from the specified [file].
+ */
+fun clusterTopology(
+ file: File,
+ powerModel: PowerModel = LinearPowerModel(350.0, idlePower = 200.0),
+ random: Random = Random(0)
+): Topology = clusterTopology(reader.read(file), powerModel, random)
+
+/**
+ * Construct a [Topology] from the specified [input].
+ */
+fun clusterTopology(
+ input: InputStream,
+ powerModel: PowerModel = LinearPowerModel(350.0, idlePower = 200.0),
+ random: Random = Random(0)
+): Topology = clusterTopology(reader.read(input), powerModel, random)
+
+/**
+ * Construct a [Topology] from the given list of [clusters].
+ */
+fun clusterTopology(
+ clusters: List<ClusterSpec>,
+ powerModel: PowerModel,
+ random: Random = Random(0)
+): Topology {
+ return object : Topology {
+ override fun resolve(): List<HostSpec> {
+ val hosts = mutableListOf<HostSpec>()
+ for (cluster in clusters) {
+ val cpuSpeed = cluster.cpuSpeed
+ val memoryPerHost = cluster.memCapacityPerHost.roundToLong()
+
+ val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", cluster.cpuCountPerHost)
+ val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
+ val machineModel = MachineModel(
+ List(cluster.cpuCountPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, cpuSpeed) },
+ listOf(unknownMemoryUnit)
+ )
+
+ repeat(cluster.hostCount) {
+ val spec = HostSpec(
+ UUID(random.nextLong(), it.toLong()),
+ "node-${cluster.name}-$it",
+ mapOf("cluster" to cluster.id),
+ machineModel,
+ SimplePowerDriver(powerModel)
+ )
+
+ hosts += spec
+ }
+ }
+
+ return hosts
+ }
+
+ override fun toString(): String = "ClusterSpecTopology"
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
deleted file mode 100644
index a8462a51..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.util.TreeSet
-
-/**
- * A [TraceReader] for the internal VM workload trace format.
- *
- * @param reader The internal trace reader to use.
- * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
- * @param run The run to which this reader belongs.
- */
-@OptIn(ExperimentalStdlibApi::class)
-public class Sc20ParquetTraceReader(
- rawReaders: List<Sc20RawParquetTraceReader>,
- performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
- workload: Workload,
- seed: Int
-) : TraceReader<SimWorkload> {
- /**
- * The iterator over the actual trace.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>> =
- rawReaders
- .map { it.read() }
- .run {
- if (workload is CompositeWorkload) {
- this.zip(workload.workloads)
- } else {
- this.zip(listOf(workload))
- }
- }
- .map { sampleWorkload(it.first, workload, it.second, seed) }
- .flatten()
- .run {
- // Apply performance interference model
- if (performanceInterferenceModel.isEmpty())
- this
- else {
- map { entry ->
- val id = entry.name
- val relevantPerformanceInterferenceModelItems =
- performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
-
- entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems))
- }
- }
- }
- .iterator()
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
deleted file mode 100644
index ffbf46d4..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetReader
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.io.File
-import java.util.UUID
-
-private val logger = KotlinLogging.logger {}
-
-/**
- * A [TraceReader] for the internal VM workload trace format.
- *
- * @param path The directory of the traces.
- */
-@OptIn(ExperimentalStdlibApi::class)
-public class Sc20RawParquetTraceReader(private val path: File) {
- /**
- * Read the fragments into memory.
- */
- private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> {
- @Suppress("DEPRECATION")
- val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet"))
- .disableCompatibility()
- .build()
-
- val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
-
- return try {
- while (true) {
- val record = reader.read() ?: break
-
- val id = record["id"].toString()
- val duration = record["duration"] as Long
- val cores = record["cores"] as Int
- val cpuUsage = record["cpuUsage"] as Double
-
- val fragment = SimTraceWorkload.Fragment(
- duration,
- cpuUsage,
- cores
- )
-
- fragments.getOrPut(id) { mutableListOf() }.add(fragment)
- }
-
- fragments
- } finally {
- reader.close()
- }
- }
-
- /**
- * Read the metadata into a workload.
- */
- private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
- @Suppress("DEPRECATION")
- val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet"))
- .disableCompatibility()
- .build()
-
- var counter = 0
- val entries = mutableListOf<TraceEntry<SimWorkload>>()
-
- return try {
- while (true) {
- val record = metaReader.read() ?: break
-
- val id = record["id"].toString()
- if (!fragments.containsKey(id)) {
- continue
- }
-
- val submissionTime = record["submissionTime"] as Long
- val endTime = record["endTime"] as Long
- val maxCores = record["maxCores"] as Int
- val requiredMemory = record["requiredMemory"] as Long
- val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
-
- val vmFragments = fragments.getValue(id).asSequence()
- val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
- val workload = SimTraceWorkload(vmFragments)
- entries.add(
- TraceEntry(
- uid, id, submissionTime, workload,
- mapOf(
- "submit-time" to submissionTime,
- "end-time" to endTime,
- "total-load" to totalLoad,
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
- )
- )
- }
-
- entries
- } catch (e: Exception) {
- e.printStackTrace()
- throw e
- } finally {
- metaReader.close()
- }
- }
-
- /**
- * The entries in the trace.
- */
- private val entries: List<TraceEntry<SimWorkload>>
-
- init {
- val fragments = parseFragments(path)
- entries = parseMeta(path, fragments)
- }
-
- /**
- * Read the entries in the trace.
- */
- public fun read(): List<TraceEntry<SimWorkload>> = entries
-
- /**
- * Create a [TraceReader] instance.
- */
- public fun createReader(): TraceReader<SimWorkload> {
- return object : TraceReader<SimWorkload>, Iterator<TraceEntry<SimWorkload>> by entries.iterator() {
- override fun close() {}
- }
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
deleted file mode 100644
index c5294b55..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.filter2.compat.FilterCompat
-import org.apache.parquet.filter2.predicate.FilterApi
-import org.apache.parquet.filter2.predicate.Statistics
-import org.apache.parquet.filter2.predicate.UserDefinedPredicate
-import org.apache.parquet.io.api.Binary
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.io.File
-import java.io.Serializable
-import java.util.SortedSet
-import java.util.TreeSet
-import java.util.UUID
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
-import kotlin.random.Random
-
-private val logger = KotlinLogging.logger {}
-
-/**
- * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
- *
- * @param traceFile The directory of the traces.
- * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
- */
-@OptIn(ExperimentalStdlibApi::class)
-public class Sc20StreamingParquetTraceReader(
- traceFile: File,
- performanceInterferenceModel: PerformanceInterferenceModel? = null,
- selectedVms: List<String> = emptyList(),
- random: Random
-) : TraceReader<SimWorkload> {
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>>
-
- /**
- * The intermediate buffer to store the read records in.
- */
- private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024)
-
- /**
- * An optional filter for filtering the selected VMs
- */
- private val filter =
- if (selectedVms.isEmpty())
- null
- else
- FilterCompat.get(
- FilterApi.userDefined(
- FilterApi.binaryColumn("id"),
- SelectedVmFilter(
- TreeSet(selectedVms)
- )
- )
- )
-
- /**
- * A poisonous fragment.
- */
- private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0))
-
- /**
- * The thread to read the records in.
- */
- private val readerThread = thread(start = true, name = "sc20-reader") {
- @Suppress("DEPRECATION")
- val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
- .disableCompatibility()
- .run { if (filter != null) withFilter(filter) else this }
- .build()
-
- try {
- while (true) {
- val record = reader.read()
-
- if (record == null) {
- queue.put(poison)
- break
- }
-
- val id = record["id"].toString()
- val duration = record["duration"] as Long
- val cores = record["cores"] as Int
- val cpuUsage = record["cpuUsage"] as Double
-
- val fragment = SimTraceWorkload.Fragment(
- duration,
- cpuUsage,
- cores
- )
-
- queue.put(id to fragment)
- }
- } catch (e: InterruptedException) {
- // Do not rethrow this
- } finally {
- reader.close()
- }
- }
-
- /**
- * Fill the buffers with the VMs
- */
- private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) {
- if (!hasNext) {
- return
- }
-
- val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>()
- queue.drainTo(fragments)
-
- for ((id, fragment) in fragments) {
- if (id == poison.first) {
- hasNext = false
- return
- }
- buffers[id]?.forEach { it.add(fragment) }
- }
- }
-
- /**
- * A flag to indicate whether the reader has more entries.
- */
- private var hasNext: Boolean = true
-
- /**
- * Initialize the reader.
- */
- init {
- val takenIds = mutableSetOf<UUID>()
- val entries = mutableMapOf<String, GenericData.Record>()
- val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
-
- @Suppress("DEPRECATION")
- val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
- .disableCompatibility()
- .run { if (filter != null) withFilter(filter) else this }
- .build()
-
- while (true) {
- val record = metaReader.read() ?: break
- val id = record["id"].toString()
- entries[id] = record
- }
-
- metaReader.close()
-
- val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms
-
- // Create the entry iterator
- iterator = selection.asSequence()
- .mapNotNull { entries[it] }
- .mapIndexed { index, record ->
- val id = record["id"].toString()
- val submissionTime = record["submissionTime"] as Long
- val endTime = record["endTime"] as Long
- val maxCores = record["maxCores"] as Int
- val requiredMemory = record["requiredMemory"] as Long
- val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray())
-
- assert(uid !in takenIds)
- takenIds += uid
-
- logger.info("Processing VM $id")
-
- val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
- val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
- buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
- val fragments = sequence {
- var time = submissionTime
- repeat@ while (true) {
- if (externalBuffer.isEmpty()) {
- if (hasNext) {
- pull(buffers)
- continue
- } else {
- break
- }
- }
-
- internalBuffer.addAll(externalBuffer)
- externalBuffer.clear()
-
- for (fragment in internalBuffer) {
- yield(fragment)
-
- time += fragment.duration
- if (time >= endTime) {
- break@repeat
- }
- }
-
- internalBuffer.clear()
- }
-
- buffers.remove(id)
- }
- val relevantPerformanceInterferenceModelItems =
- if (performanceInterferenceModel != null)
- PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(),
- Random(random.nextInt())
- )
- else
- null
- val workload = SimTraceWorkload(fragments)
- val meta = mapOf(
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
-
- TraceEntry(
- uid, id, submissionTime, workload,
- if (performanceInterferenceModel != null)
- meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems as Any)
- else
- meta
- )
- }
- .sortedBy { it.start }
- .toList()
- .iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {
- readerThread.interrupt()
- }
-
- private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
- override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
-
- override fun canDrop(statistics: Statistics<Binary>): Boolean {
- val min = statistics.min
- val max = statistics.max
-
- return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty()
- }
-
- override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean {
- val min = statistics.min
- val max = statistics.max
-
- return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
- }
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
deleted file mode 100644
index 7713c06f..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
+++ /dev/null
@@ -1,621 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import com.github.ajalt.clikt.core.CliktCommand
-import com.github.ajalt.clikt.parameters.arguments.argument
-import com.github.ajalt.clikt.parameters.groups.OptionGroup
-import com.github.ajalt.clikt.parameters.groups.groupChoice
-import com.github.ajalt.clikt.parameters.options.convert
-import com.github.ajalt.clikt.parameters.options.default
-import com.github.ajalt.clikt.parameters.options.defaultLazy
-import com.github.ajalt.clikt.parameters.options.option
-import com.github.ajalt.clikt.parameters.options.required
-import com.github.ajalt.clikt.parameters.options.split
-import com.github.ajalt.clikt.parameters.types.file
-import com.github.ajalt.clikt.parameters.types.long
-import me.tongfei.progressbar.ProgressBar
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.format.trace.sc20.Sc20VmPlacementReader
-import java.io.BufferedReader
-import java.io.File
-import java.io.FileReader
-import java.util.Random
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * Represents the command for converting traces
- */
-public class TraceConverterCli : CliktCommand(name = "trace-converter") {
- /**
- * The directory where the trace should be stored.
- */
- private val outputPath by option("-O", "--output", help = "path to store the trace")
- .file(canBeFile = false, mustExist = false)
- .defaultLazy { File("output") }
-
- /**
- * The directory where the input trace is located.
- */
- private val inputPath by argument("input", help = "path to the input trace")
- .file(canBeFile = false)
-
- /**
- * The input type of the trace.
- */
- private val type by option("-t", "--type", help = "input type of trace").groupChoice(
- "solvinity" to SolvinityConversion(),
- "bitbrains" to BitbrainsConversion(),
- "azure" to AzureConversion()
- )
-
- override fun run() {
- val metaSchema = SchemaBuilder
- .record("meta")
- .namespace("org.opendc.format.sc20")
- .fields()
- .name("id").type().stringType().noDefault()
- .name("submissionTime").type().longType().noDefault()
- .name("endTime").type().longType().noDefault()
- .name("maxCores").type().intType().noDefault()
- .name("requiredMemory").type().longType().noDefault()
- .endRecord()
- val schema = SchemaBuilder
- .record("trace")
- .namespace("org.opendc.format.sc20")
- .fields()
- .name("id").type().stringType().noDefault()
- .name("time").type().longType().noDefault()
- .name("duration").type().longType().noDefault()
- .name("cores").type().intType().noDefault()
- .name("cpuUsage").type().doubleType().noDefault()
- .name("flops").type().longType().noDefault()
- .endRecord()
-
- val metaParquet = File(outputPath, "meta.parquet")
- val traceParquet = File(outputPath, "trace.parquet")
-
- if (metaParquet.exists()) {
- metaParquet.delete()
- }
- if (traceParquet.exists()) {
- traceParquet.delete()
- }
-
- val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(metaParquet.toURI()))
- .withSchema(metaSchema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
-
- val writer = AvroParquetWriter.builder<GenericData.Record>(Path(traceParquet.toURI()))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
-
- try {
- val type = type ?: throw IllegalArgumentException("Invalid trace conversion")
- val allFragments = type.read(inputPath, metaSchema, metaWriter)
- allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
-
- for (fragment in allFragments) {
- val record = GenericData.Record(schema)
- record.put("id", fragment.id)
- record.put("time", fragment.tick)
- record.put("duration", fragment.duration)
- record.put("cores", fragment.cores)
- record.put("cpuUsage", fragment.usage)
- record.put("flops", fragment.flops)
-
- writer.write(record)
- }
- } finally {
- writer.close()
- metaWriter.close()
- }
- }
-}
-
-/**
- * The supported trace conversions.
- */
-public sealed class TraceConversion(name: String) : OptionGroup(name) {
- /**
- * Read the fragments of the trace.
- */
- public abstract fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment>
-}
-
-public class SolvinityConversion : TraceConversion("Solvinity") {
- private val clusters by option()
- .split(",")
-
- private val vmPlacements by option("--vm-placements", help = "file containing the VM placements")
- .file(canBeDir = false)
- .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } }
- .required()
-
- override fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment> {
- val clusters = clusters?.toSet() ?: emptySet()
- val timestampCol = 0
- val cpuUsageCol = 1
- val coreCol = 12
- val provisionedMemoryCol = 20
- val traceInterval = 5 * 60 * 1000L
-
- // Identify start time of the entire trace
- var minTimestamp = Long.MAX_VALUE
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach file@{ vmFile ->
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val vmId = vmFile.name
-
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null || !clusters.contains(clusterName)) {
- continue
- }
-
- val values = line.split("\t")
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
-
- if (timestamp < minTimestamp) {
- minTimestamp = timestamp
- }
- return@file
- }
- }
- }
- }
-
- println("Start of trace at $minTimestamp")
-
- val allFragments = mutableListOf<Fragment>()
-
- val begin = 15 * 24 * 60 * 60 * 1000L
- val end = 45 * 24 * 60 * 60 * 1000L
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach { vmFile ->
- println(vmFile)
-
- var vmId = ""
- var maxCores = -1
- var requiredMemory = -1L
- var cores: Int
- var minTime = Long.MAX_VALUE
-
- val flopsFragments = sequence {
- var last: Fragment? = null
-
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split("\t")
-
- vmId = vmFile.name
-
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null || !clusters.contains(clusterName)) {
- continue
- }
-
- val timestamp =
- (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp
- if (begin > timestamp || timestamp > end) {
- continue
- }
-
- cores = values[coreCol].trim().toInt()
- requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
- maxCores = max(maxCores, cores)
- minTime = min(minTime, timestamp)
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
- requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
- maxCores = max(maxCores, cores)
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- last = if (last != null && last!!.flops == 0L && flops == 0L) {
- val oldFragment = last!!
- Fragment(
- vmId,
- oldFragment.tick,
- oldFragment.flops + flops,
- oldFragment.duration + traceInterval,
- cpuUsage,
- cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- cores
- )
- if (last != null) {
- yield(last!!)
- }
- fragment
- }
- }
- }
- }
-
- if (last != null) {
- yield(last!!)
- }
- }
-
- var maxTime = Long.MIN_VALUE
- flopsFragments.filter { it.tick in begin until end }.forEach { fragment ->
- allFragments.add(fragment)
- maxTime = max(maxTime, fragment.tick)
- }
-
- if (minTime in begin until end) {
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", vmId)
- metaRecord.put("submissionTime", minTime)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
- metaWriter.write(metaRecord)
- }
- }
-
- return allFragments
- }
-}
-
-/**
- * Conversion of the Bitbrains public trace.
- */
-public class BitbrainsConversion : TraceConversion("Bitbrains") {
- override fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment> {
- val timestampCol = 0
- val cpuUsageCol = 3
- val coreCol = 1
- val provisionedMemoryCol = 5
- val traceInterval = 5 * 60 * 1000L
-
- val allFragments = mutableListOf<Fragment>()
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach { vmFile ->
- println(vmFile)
-
- var vmId = ""
- var maxCores = -1
- var requiredMemory = -1L
- var cores: Int
- var minTime = Long.MAX_VALUE
-
- val flopsFragments = sequence {
- var last: Fragment? = null
-
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .drop(1)
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split(";\t")
-
- vmId = vmFile.name
-
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
-
- cores = values[coreCol].trim().toInt()
- val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB
- requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong())
- maxCores = max(maxCores, cores)
- minTime = min(minTime, timestamp)
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- last = if (last != null && last!!.flops == 0L && flops == 0L) {
- val oldFragment = last!!
- Fragment(
- vmId,
- oldFragment.tick,
- oldFragment.flops + flops,
- oldFragment.duration + traceInterval,
- cpuUsage,
- cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- cores
- )
- if (last != null) {
- yield(last!!)
- }
- fragment
- }
- }
- }
- }
-
- if (last != null) {
- yield(last!!)
- }
- }
-
- var maxTime = Long.MIN_VALUE
- flopsFragments.forEach { fragment ->
- allFragments.add(fragment)
- maxTime = max(maxTime, fragment.tick)
- }
-
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", vmId)
- metaRecord.put("submissionTime", minTime)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
- metaWriter.write(metaRecord)
- }
-
- return allFragments
- }
-}
-
-/**
- * Conversion of the Azure public VM trace.
- */
-public class AzureConversion : TraceConversion("Azure") {
- private val seed by option(help = "seed for trace sampling")
- .long()
- .default(0)
-
- override fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment> {
- val random = Random(seed)
- val fraction = 0.01
-
- // Read VM table
- val vmIdTableCol = 0
- val coreTableCol = 9
- val provisionedMemoryTableCol = 10
-
- var vmId: String
- var cores: Int
- var requiredMemory: Long
-
- val vmIds = mutableSetOf<String>()
- val vmIdToMetadata = mutableMapOf<String, VmInfo>()
-
- BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader ->
- reader.lineSequence()
- .chunked(1024)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
- // Sample only a fraction of the VMs
- if (random.nextDouble() > fraction) {
- continue
- }
-
- val values = line.split(",")
-
- // Exclude VMs with a large number of cores (not specified exactly)
- if (values[coreTableCol].contains(">")) {
- continue
- }
-
- vmId = values[vmIdTableCol].trim()
- cores = values[coreTableCol].trim().toInt()
- requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB
-
- vmIds.add(vmId)
- vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L)
- }
- }
- }
-
- // Read VM metric reading files
- val timestampCol = 0
- val vmIdCol = 1
- val cpuUsageCol = 4
- val traceInterval = 5 * 60 * 1000L
-
- val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>()
- val vmIdToLastFragment = mutableMapOf<String, Fragment?>()
- val allFragments = mutableListOf<Fragment>()
-
- for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) {
- val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv")
- var timestamp: Long
- var cpuUsage: Double
-
- BufferedReader(FileReader(readingsFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split(",")
- vmId = values[vmIdCol].trim()
-
- // Ignore readings for VMs not in the sample
- if (!vmIds.contains(vmId)) {
- continue
- }
-
- timestamp = values[timestampCol].trim().toLong() * 1000L
- vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp)
- cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz
- vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp)
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
- val lastFragment = vmIdToLastFragment[vmId]
-
- vmIdToLastFragment[vmId] =
- if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) {
- Fragment(
- vmId,
- lastFragment.tick,
- lastFragment.flops + flops,
- lastFragment.duration + traceInterval,
- cpuUsage,
- vmIdToMetadata[vmId]!!.cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- vmIdToMetadata[vmId]!!.cores
- )
- if (lastFragment != null) {
- if (vmIdToFragments[vmId] == null) {
- vmIdToFragments[vmId] = mutableListOf()
- }
- vmIdToFragments[vmId]!!.add(lastFragment)
- allFragments.add(lastFragment)
- }
- fragment
- }
- }
- }
- }
- }
-
- for (entry in vmIdToLastFragment) {
- if (entry.value != null) {
- if (vmIdToFragments[entry.key] == null) {
- vmIdToFragments[entry.key] = mutableListOf()
- }
- vmIdToFragments[entry.key]!!.add(entry.value!!)
- }
- }
-
- println("Read ${vmIdToLastFragment.size} VMs")
-
- for (entry in vmIdToMetadata) {
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", entry.key)
- metaRecord.put("submissionTime", entry.value.minTime)
- metaRecord.put("endTime", entry.value.maxTime)
- println("${entry.value.minTime} - ${entry.value.maxTime}")
- metaRecord.put("maxCores", entry.value.cores)
- metaRecord.put("requiredMemory", entry.value.requiredMemory)
- metaWriter.write(metaRecord)
- }
-
- return allFragments
- }
-}
-
-public data class Fragment(
- public val id: String,
- public val tick: Long,
- public val flops: Long,
- public val duration: Long,
- public val usage: Double,
- public val cores: Int
-)
-
-public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long)
-
-/**
- * A script to convert a trace in text format into a Parquet trace.
- */
-public fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
deleted file mode 100644
index 5c8727ea..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import mu.KotlinLogging
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.SamplingStrategy
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.format.trace.TraceEntry
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.util.*
-import kotlin.random.Random
-
-private val logger = KotlinLogging.logger {}
-
-/**
- * Sample the workload for the specified [run].
- */
-public fun sampleWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- subWorkload: Workload,
- seed: Int
-): List<TraceEntry<SimWorkload>> {
- return when {
- workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed)
- workload.samplingStrategy == SamplingStrategy.HPC ->
- sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false)
- workload.samplingStrategy == SamplingStrategy.HPC_LOAD ->
- sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true)
- else ->
- sampleRegularWorkload(trace, workload, workload, seed)
- }
-}
-
-/**
- * Sample a regular (non-HPC) workload.
- */
-public fun sampleRegularWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- subWorkload: Workload,
- seed: Int
-): List<TraceEntry<SimWorkload>> {
- val fraction = subWorkload.fraction
-
- val shuffled = trace.shuffled(Random(seed))
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- val totalLoad = if (workload is CompositeWorkload) {
- workload.totalLoad
- } else {
- shuffled.sumByDouble { it.meta.getValue("total-load") as Double }
- }
- var currentLoad = 0.0
-
- for (entry in shuffled) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- currentLoad += entryLoad
- res += entry
- }
-
- logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
-}
-
-/**
- * Sample a HPC workload.
- */
-public fun sampleHpcWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- seed: Int,
- sampleOnLoad: Boolean
-): List<TraceEntry<SimWorkload>> {
- val pattern = Regex("^vm__workload__(ComputeNode|cn).*")
- val random = Random(seed)
-
- val fraction = workload.fraction
- val (hpc, nonHpc) = trace.partition { entry ->
- val name = entry.name
- name.matches(pattern)
- }
-
- val hpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- hpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- val nonHpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- nonHpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
-
- val totalLoad = if (workload is CompositeWorkload) {
- workload.totalLoad
- } else {
- trace.sumByDouble { it.meta.getValue("total-load") as Double }
- }
-
- logger.debug { "Total trace load: $totalLoad" }
- var hpcCount = 0
- var hpcLoad = 0.0
- var nonHpcCount = 0
- var nonHpcLoad = 0.0
-
- val res = mutableListOf<TraceEntry<SimWorkload>>()
-
- if (sampleOnLoad) {
- var currentLoad = 0.0
- for (entry in hpcSequence) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- hpcLoad += entryLoad
- hpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
-
- for (entry in nonHpcSequence) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > 1) {
- break
- }
-
- nonHpcLoad += entryLoad
- nonHpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
- } else {
- hpcSequence
- .take((fraction * trace.size).toInt())
- .forEach { entry ->
- hpcLoad += entry.meta.getValue("total-load") as Double
- hpcCount += 1
- res.add(entry)
- }
-
- nonHpcSequence
- .take(((1 - fraction) * trace.size).toInt())
- .forEach { entry ->
- nonHpcLoad += entry.meta.getValue("total-load") as Double
- nonHpcCount += 1
- res.add(entry)
- }
- }
-
- logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
- logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
- logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
-}
-
-/**
- * Sample a random trace entry.
- */
-private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> {
- val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
- return entry.copy(uid = uid)
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt
index c29e116e..67de2777 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,14 +20,28 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry
+package org.opendc.experiments.capelin.util
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import com.fasterxml.jackson.module.kotlin.readValue
+import java.io.InputStream
/**
- * An event that occurs within the system.
+ * A parser for the JSON VM placement data files used for the TPDS article on Capelin.
*/
-public abstract class Event(public val name: String) {
+class VmPlacementReader {
+ /**
+ * The [ObjectMapper] to parse the placement.
+ */
+ private val mapper = jacksonObjectMapper()
+
/**
- * The time of occurrence of this event.
+ * Read the VM placements from the input.
*/
- public abstract val timestamp: Long
+ fun read(input: InputStream): Map<String, String> {
+ return mapper.readValue<Map<String, String>>(input)
+ .mapKeys { "vm__workload__${it.key}.txt" }
+ .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00
+ }
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
index d1c01b8e..d46b50c3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
@@ -36,7 +36,7 @@
<Logger name="org.opendc.experiments.capelin" level="info" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
- <Logger name="org.opendc.experiments.capelin.trace" level="debug" additivity="false">
+ <Logger name="org.opendc.experiments.vm.trace" level="debug" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.apache.hadoop" level="warn" additivity="false">
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 2d5cc68c..e34c5bdc 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,185 +22,276 @@
package org.opendc.experiments.capelin
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.Channel
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
-import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.workload.*
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.compute.workload.topology.apply
+import org.opendc.compute.workload.util.PerformanceInterferenceReader
+import org.opendc.experiments.capelin.topology.clusterTopology
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
+import java.time.Duration
+import java.util.*
/**
- * An integration test suite for the SC20 experiments.
+ * An integration test suite for the Capelin experiments.
*/
class CapelinIntegrationTest {
/**
* The monitor used to keep track of the metrics.
*/
- private lateinit var monitor: TestExperimentReporter
+ private lateinit var exporter: TestComputeMetricExporter
+
+ /**
+ * The [FilterScheduler] to use for all experiments.
+ */
+ private lateinit var computeScheduler: FilterScheduler
+
+ /**
+ * The [ComputeWorkloadLoader] responsible for loading the traces.
+ */
+ private lateinit var workloadLoader: ComputeWorkloadLoader
/**
* Setup the experimental environment.
*/
@BeforeEach
fun setUp() {
- monitor = TestExperimentReporter()
+ exporter = TestComputeMetricExporter()
+ computeScheduler = FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace"))
}
+ /**
+ * Test a large simulation setup.
+ */
@Test
fun testLarge() = runBlockingSimulation {
- val failures = false
- val seed = 0
- val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
+ val workload = createTestWorkload(1.0)
+ val runner = ComputeWorkloadRunner(
+ coroutineContext,
+ clock,
+ computeScheduler
)
- val traceReader = createTestTraceReader()
- val environmentReader = createTestEnvironmentReader()
- lateinit var monitorResults: ComputeMetrics
-
- val meterProvider = createMeterProvider(clock)
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- val failureDomain = if (failures) {
- println("ENABLING failures")
- createFailureDomain(
- this,
- clock,
- seed,
- 24.0 * 7,
- scheduler,
- chan
- )
- } else {
- null
- }
-
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
- }
-
- failureDomain?.cancel()
+ val topology = createTopology()
+ val metricReader = CoroutineMetricReader(this, runner.producers, exporter)
+
+ try {
+ runner.apply(topology)
+ runner.run(workload, 0)
+ } finally {
+ runner.close()
+ metricReader.close()
}
- monitorResults = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}")
+ val serviceMetrics = collectServiceMetrics(runner.producers[0])
+ println(
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ )
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") },
- { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
- { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
- { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(207389912923, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(207122087280, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
- { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
+ { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") },
+ { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
+ { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
+ { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
+ { assertEquals(223325655, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
+ { assertEquals(67006560, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
+ { assertEquals(3159377, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
+ { assertEquals(5.840212485920686E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
+ /**
+ * Test a small simulation setup.
+ */
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
+ val workload = createTestWorkload(0.25, seed)
+
+ val simulator = ComputeWorkloadRunner(
+ coroutineContext,
+ clock,
+ computeScheduler
)
- val traceReader = createTestTraceReader(0.5, seed)
- val environmentReader = createTestEnvironmentReader("single")
-
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
- }
+ val topology = createTopology("single")
+ val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
+
+ try {
+ simulator.apply(topology)
+ simulator.run(workload, seed.toLong())
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(simulator.producers[0])
+ println(
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ )
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(96350072517, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(96330335057, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
- { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
+ { assertEquals(10997726, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9740289, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } },
+ { assertEquals(7.0099453912813E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }
)
}
/**
- * Obtain the trace reader for the test.
+ * Test a small simulation setup with interference.
*/
- private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
- return Sc20ParquetTraceReader(
- listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))),
- emptyMap(),
- Workload("test", fraction),
- seed
+ @Test
+ fun testInterference() = runBlockingSimulation {
+ val seed = 0
+ val workload = createTestWorkload(1.0, seed)
+ val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
+ val performanceInterferenceModel =
+ PerformanceInterferenceReader()
+ .read(perfInterferenceInput)
+ .let { VmInterferenceModel(it, Random(seed.toLong())) }
+
+ val simulator = ComputeWorkloadRunner(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ interferenceModel = performanceInterferenceModel
+ )
+ val topology = createTopology("single")
+ val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
+
+ try {
+ simulator.apply(topology)
+ simulator.run(workload, seed.toLong())
+ } finally {
+ simulator.close()
+ metricReader.close()
+ }
+
+ val serviceMetrics = collectServiceMetrics(simulator.producers[0])
+ println(
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ )
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(481251, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}
/**
- * Obtain the environment reader for the test.
+ * Test a small simulation setup with failures.
*/
- private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader {
- val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt")
- return Sc20ClusterEnvironmentReader(stream)
- }
+ @Test
+ fun testFailures() = runBlockingSimulation {
+ val seed = 1
+ val simulator = ComputeWorkloadRunner(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ grid5000(Duration.ofDays(7))
+ )
+ val topology = createTopology("single")
+ val workload = createTestWorkload(0.25, seed)
+ val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
- class TestExperimentReporter : ExperimentMonitor {
- var totalRequestedBurst = 0L
- var totalGrantedBurst = 0L
- var totalOvercommissionedBurst = 0L
- var totalInterferedBurst = 0L
-
- override fun reportHostSlice(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- numberOfDeployedImages: Int,
- host: Host,
- ) {
- totalRequestedBurst += requestedBurst
- totalGrantedBurst += grantedBurst
- totalOvercommissionedBurst += overcommissionedBurst
- totalInterferedBurst += interferedBurst
+ try {
+ simulator.apply(topology)
+ simulator.run(workload, seed.toLong())
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- override fun close() {}
+ val serviceMetrics = collectServiceMetrics(simulator.producers[0])
+ println(
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ )
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(10865478, exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9606177, exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } },
+ { assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } }
+ )
+ }
+
+ /**
+ * Obtain the trace reader for the test.
+ */
+ private fun createTestWorkload(fraction: Double, seed: Int = 0): List<VirtualMachine> {
+ val source = trace("bitbrains-small").sampleByLoad(fraction)
+ return source.resolve(workloadLoader, Random(seed.toLong()))
+ }
+
+ /**
+ * Obtain the topology factory for the test.
+ */
+ private fun createTopology(name: String = "topology"): Topology {
+ val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name.txt"))
+ return stream.use { clusterTopology(stream) }
+ }
+
+ class TestComputeMetricExporter : ComputeMetricExporter() {
+ var idleTime = 0L
+ var activeTime = 0L
+ var stealTime = 0L
+ var lostTime = 0L
+ var energyUsage = 0.0
+ var uptime = 0L
+
+ override fun record(data: HostData) {
+ idleTime += data.cpuIdleTime
+ activeTime += data.cpuActiveTime
+ stealTime += data.cpuStealTime
+ lostTime += data.cpuLostTime
+ energyUsage += data.powerTotal
+ uptime += data.uptime
+ }
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json
new file mode 100644
index 00000000..51fc6366
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json
@@ -0,0 +1,21 @@
+[
+ {
+ "vms": [
+ "141",
+ "379",
+ "851",
+ "116"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.8830158730158756
+ },
+ {
+ "vms": [
+ "205",
+ "116",
+ "463"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.7133055555552751
+ }
+]
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
index 53b3c2d7..5642003d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
@@ -1,3 +1,3 @@
ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
-A01;A01;8;3.2;64;1;64;8
+A01;A01;8;3.2;128;1;128;8
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
new file mode 100644
index 00000000..da6e5330
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
new file mode 100644
index 00000000..fe0a254c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
deleted file mode 100644
index ce7a812c..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
+++ /dev/null
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
deleted file mode 100644
index 1d7ce882..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
+++ /dev/null
Binary files differ