From 481706adc89d502840c967d94a165c55d8ac0e1a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 23 Feb 2021 21:30:51 +0100 Subject: exp: Add support for running Capelin experiments in IntelliJ --- .../opendc-experiments-capelin/.gitignore | 2 + .../opendc-experiments-capelin/build.gradle.kts | 50 ++ .../capelin/CompositeWorkloadPortfolio.kt | 79 +++ .../experiments/capelin/ExperimentHelpers.kt | 276 +++++++++ .../opendc/experiments/capelin/HorVerPortfolio.kt | 60 ++ .../opendc/experiments/capelin/MoreHpcPortfolio.kt | 59 ++ .../experiments/capelin/MoreVelocityPortfolio.kt | 56 ++ .../capelin/OperationalPhenomenaPortfolio.kt | 61 ++ .../org/opendc/experiments/capelin/Portfolio.kt | 221 ++++++++ .../opendc/experiments/capelin/ReplayPortfolio.kt | 50 ++ .../opendc/experiments/capelin/TestPortfolio.kt | 47 ++ .../capelin/model/OperationalPhenomena.kt | 31 + .../opendc/experiments/capelin/model/Topology.kt | 28 + .../opendc/experiments/capelin/model/Workload.kt | 44 ++ .../capelin/monitor/ExperimentMonitor.kt | 75 +++ .../capelin/monitor/ParquetExperimentMonitor.kt | 202 +++++++ .../opendc/experiments/capelin/telemetry/Event.kt | 35 ++ .../experiments/capelin/telemetry/HostEvent.kt | 43 ++ .../capelin/telemetry/ProvisionerEvent.kt | 39 ++ .../experiments/capelin/telemetry/RunEvent.kt | 34 ++ .../experiments/capelin/telemetry/VmEvent.kt | 41 ++ .../telemetry/parquet/ParquetEventWriter.kt | 126 +++++ .../telemetry/parquet/ParquetHostEventWriter.kt | 81 +++ .../parquet/ParquetProvisionerEventWriter.kt | 65 +++ .../telemetry/parquet/ParquetRunEventWriter.kt | 72 +++ .../capelin/trace/Sc20ParquetTraceReader.kt | 94 ++++ .../capelin/trace/Sc20RawParquetTraceReader.kt | 170 ++++++ .../trace/Sc20StreamingParquetTraceReader.kt | 305 ++++++++++ .../capelin/trace/Sc20TraceConverter.kt | 621 +++++++++++++++++++++ .../experiments/capelin/trace/WorkloadSampler.kt | 210 +++++++ .../src/main/resources/log4j2.xml | 49 ++ .../experiments/capelin/CapelinIntegrationTest.kt | 256 +++++++++ .../src/test/resources/env/single.txt | 3 + .../src/test/resources/env/topology.txt | 5 + .../src/test/resources/trace/meta.parquet | Bin 0 -> 2148 bytes .../src/test/resources/trace/trace.parquet | Bin 0 -> 1672463 bytes .../opendc-experiments-sc18/.gitignore | 2 + .../sc18/UnderspecificationExperiment.kt | 4 +- .../opendc-experiments-sc20/build.gradle.kts | 55 -- .../sc20/experiment/CompositeWorkloadPortfolio.kt | 79 --- .../sc20/experiment/ExperimentHelpers.kt | 276 --------- .../experiments/sc20/experiment/HorVerPortfolio.kt | 60 -- .../sc20/experiment/MoreHpcPortfolio.kt | 56 -- .../sc20/experiment/MoreVelocityPortfolio.kt | 56 -- .../experiment/OperationalPhenomenaPortfolio.kt | 61 -- .../experiments/sc20/experiment/Portfolio.kt | 217 ------- .../experiments/sc20/experiment/ReplayPortfolio.kt | 50 -- .../experiments/sc20/experiment/TestPortfolio.kt | 47 -- .../sc20/experiment/model/OperationalPhenomena.kt | 33 -- .../experiments/sc20/experiment/model/Topology.kt | 30 - .../experiments/sc20/experiment/model/Workload.kt | 44 -- .../sc20/experiment/monitor/ExperimentMonitor.kt | 75 --- .../experiment/monitor/ParquetExperimentMonitor.kt | 202 ------- .../org/opendc/experiments/sc20/telemetry/Event.kt | 35 -- .../opendc/experiments/sc20/telemetry/HostEvent.kt | 43 -- .../experiments/sc20/telemetry/ProvisionerEvent.kt | 39 -- .../opendc/experiments/sc20/telemetry/RunEvent.kt | 34 -- .../opendc/experiments/sc20/telemetry/VmEvent.kt | 41 -- .../sc20/telemetry/parquet/ParquetEventWriter.kt | 126 ----- .../telemetry/parquet/ParquetHostEventWriter.kt | 81 --- .../parquet/ParquetProvisionerEventWriter.kt | 65 --- .../telemetry/parquet/ParquetRunEventWriter.kt | 72 --- .../sc20/trace/Sc20ParquetTraceReader.kt | 94 ---- .../sc20/trace/Sc20RawParquetTraceReader.kt | 170 ------ .../sc20/trace/Sc20StreamingParquetTraceReader.kt | 305 ---------- .../experiments/sc20/trace/Sc20TraceConverter.kt | 621 --------------------- .../experiments/sc20/trace/WorkloadSampler.kt | 210 ------- .../src/main/resources/log4j2.xml | 49 -- .../opendc/experiments/sc20/Sc20IntegrationTest.kt | 256 --------- .../src/test/resources/env/single.txt | 3 - .../src/test/resources/env/topology.txt | 5 - .../src/test/resources/trace/meta.parquet | Bin 2148 -> 0 bytes .../src/test/resources/trace/trace.parquet | Bin 1672463 -> 0 bytes simulator/opendc-runner-web/build.gradle.kts | 2 +- .../src/main/kotlin/org/opendc/runner/web/Main.kt | 14 +- .../org/opendc/runner/web/WebExperimentMonitor.kt | 4 +- 76 files changed, 3604 insertions(+), 3602 deletions(-) create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/.gitignore create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet create mode 100644 simulator/opendc-experiments/opendc-experiments-sc18/.gitignore delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/CompositeWorkloadPortfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/HorVerPortfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreHpcPortfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreVelocityPortfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/OperationalPhenomenaPortfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Portfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ReplayPortfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/TestPortfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Topology.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Workload.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/Event.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/HostEvent.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/RunEvent.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/VmEvent.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20TraceConverter.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/single.txt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/topology.txt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/meta.parquet delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/trace.parquet diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/.gitignore b/simulator/opendc-experiments/opendc-experiments-capelin/.gitignore new file mode 100644 index 00000000..ba64707c --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/.gitignore @@ -0,0 +1,2 @@ +input/ +output/ diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts new file mode 100644 index 00000000..041cf36a --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2019 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Experiments for the Capelin work" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `experiment-conventions` + `testing-conventions` +} + +dependencies { + api(project(":opendc-core")) + api(project(":opendc-harness")) + implementation(project(":opendc-format")) + implementation(project(":opendc-simulator:opendc-simulator-core")) + implementation(project(":opendc-simulator:opendc-simulator-compute")) + implementation(project(":opendc-simulator:opendc-simulator-failures")) + implementation(project(":opendc-compute:opendc-compute-simulator")) + + implementation("io.github.microutils:kotlin-logging") + implementation("me.tongfei:progressbar:${versions["progressbar"]}") + implementation("com.github.ajalt.clikt:clikt:${versions["clikt"]}") + + implementation("org.apache.parquet:parquet-avro:${versions["parquet-avro"]}") + implementation("org.apache.hadoop:hadoop-client:${versions["hadoop-client"]}") { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + exclude(group = "log4j") + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt new file mode 100644 index 00000000..faabe5cb --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.opendc.experiments.capelin.model.CompositeWorkload +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.harness.dsl.anyOf + +/** + * A [Portfolio] that explores the effect of a composite workload. + */ +public class CompositeWorkloadPortfolio : Portfolio("composite-workload") { + private val totalSampleLoad = 1.3301733005049648E12 + + override val topology: Topology by anyOf( + Topology("base"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-ver-hom"), + Topology("exp-vel-ver-hom") + ) + + override val workload: Workload by anyOf( + CompositeWorkload( + "all-azure", + listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)), + totalSampleLoad + ), + CompositeWorkload( + "solvinity-25-azure-75", + listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)), + totalSampleLoad + ), + CompositeWorkload( + "solvinity-50-azure-50", + listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)), + totalSampleLoad + ), + CompositeWorkload( + "solvinity-75-azure-25", + listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)), + totalSampleLoad + ), + CompositeWorkload( + "all-solvinity", + listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)), + totalSampleLoad + ) + ) + + override val operationalPhenomena: OperationalPhenomena by anyOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false) + ) + + override val allocationPolicy: String by anyOf( + "active-servers" + ) +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt new file mode 100644 index 00000000..8f3e686a --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -0,0 +1,276 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.experiment + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.takeWhile +import kotlinx.coroutines.launch +import mu.KotlinLogging +import org.opendc.compute.core.Flavor +import org.opendc.compute.core.ServerEvent +import org.opendc.compute.core.metal.NODE_CLUSTER +import org.opendc.compute.core.metal.driver.BareMetalDriver +import org.opendc.compute.core.metal.service.ProvisioningService +import org.opendc.compute.core.virt.HypervisorEvent +import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.simulator.SimBareMetalDriver +import org.opendc.compute.simulator.SimVirtDriver +import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.simulator.allocation.AllocationPolicy +import org.opendc.experiments.capelin.monitor.ExperimentMonitor +import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader +import org.opendc.format.environment.EnvironmentReader +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.failures.CorrelatedFaultInjector +import org.opendc.simulator.failures.FailureDomain +import org.opendc.simulator.failures.FaultInjector +import org.opendc.trace.core.EventTracer +import java.io.File +import java.time.Clock +import kotlin.math.ln +import kotlin.math.max +import kotlin.random.Random + +/** + * The logger for this experiment. + */ +private val logger = KotlinLogging.logger {} + +/** + * Construct the failure domain for the experiments. + */ +public suspend fun createFailureDomain( + coroutineScope: CoroutineScope, + clock: Clock, + seed: Int, + failureInterval: Double, + bareMetalProvisioner: ProvisioningService, + chan: Channel +): CoroutineScope { + val job = coroutineScope.launch { + chan.receive() + val random = Random(seed) + val injectors = mutableMapOf() + for (node in bareMetalProvisioner.nodes()) { + val cluster = node.metadata[NODE_CLUSTER] as String + val injector = + injectors.getOrPut(cluster) { + createFaultInjector( + this, + clock, + random, + failureInterval + ) + } + injector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + return CoroutineScope(coroutineScope.coroutineContext + job) +} + +/** + * Obtain the [FaultInjector] to use for the experiments. + */ +public fun createFaultInjector( + coroutineScope: CoroutineScope, + clock: Clock, + random: Random, + failureInterval: Double +): FaultInjector { + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return CorrelatedFaultInjector( + coroutineScope, + clock, + iatScale = ln(failureInterval), iatShape = 1.03, // Hours + sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1 + dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes + random = random + ) +} + +/** + * Create the trace reader from which the VM workloads are read. + */ +public fun createTraceReader( + path: File, + performanceInterferenceModel: PerformanceInterferenceModel, + vms: List, + seed: Int +): Sc20StreamingParquetTraceReader { + return Sc20StreamingParquetTraceReader( + path, + performanceInterferenceModel, + vms, + Random(seed) + ) +} + +/** + * Construct the environment for a VM provisioner and return the provisioner instance. + */ +public suspend fun createProvisioner( + coroutineScope: CoroutineScope, + clock: Clock, + environmentReader: EnvironmentReader, + allocationPolicy: AllocationPolicy, + eventTracer: EventTracer +): Pair { + val environment = environmentReader.use { it.construct(coroutineScope, clock) } + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] + + // Wait for the bare metal nodes to be spawned + delay(10) + + val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer, SimFairShareHypervisorProvider()) + + // Wait for the hypervisors to be spawned + delay(10) + + return bareMetalProvisioner to scheduler +} + +/** + * Attach the specified monitor to the VM provisioner. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public suspend fun attachMonitor( + coroutineScope: CoroutineScope, + clock: Clock, + scheduler: SimVirtProvisioningService, + monitor: ExperimentMonitor +) { + + val hypervisors = scheduler.drivers() + + // Monitor hypervisor events + for (hypervisor in hypervisors) { + // TODO Do not expose VirtDriver directly but use Hypervisor class. + val server = (hypervisor as SimVirtDriver).server + monitor.reportHostStateChange(clock.millis(), hypervisor, server) + server.events + .onEach { event -> + val time = clock.millis() + when (event) { + is ServerEvent.StateChanged -> { + monitor.reportHostStateChange(time, hypervisor, event.server) + } + } + } + .launchIn(coroutineScope) + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> monitor.reportHostSlice( + clock.millis(), + event.requestedBurst, + event.grantedBurst, + event.overcommissionedBurst, + event.interferedBurst, + event.cpuUsage, + event.cpuDemand, + event.numberOfDeployedImages, + event.hostServer + ) + } + } + .launchIn(coroutineScope) + + val driver = hypervisor.server.services[BareMetalDriver.Key] as SimBareMetalDriver + driver.powerDraw + .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } + .launchIn(coroutineScope) + } + + scheduler.events + .onEach { event -> + when (event) { + is VirtProvisioningEvent.MetricsAvailable -> + monitor.reportProvisionerMetrics(clock.millis(), event) + } + } + .launchIn(coroutineScope) +} + +/** + * Process the trace. + */ +public suspend fun processTrace( + coroutineScope: CoroutineScope, + clock: Clock, + reader: TraceReader, + scheduler: SimVirtProvisioningService, + chan: Channel, + monitor: ExperimentMonitor +) { + try { + var submitted = 0 + + while (reader.hasNext()) { + val (time, workload) = reader.next() + + submitted++ + delay(max(0, time - clock.millis())) + coroutineScope.launch { + chan.send(Unit) + val server = scheduler.deploy( + workload.image.name, + workload.image, + Flavor( + workload.image.tags["cores"] as Int, + workload.image.tags["required-memory"] as Long + ) + ) + // Monitor server events + server.events + .onEach { + if (it is ServerEvent.StateChanged) { + monitor.reportVmStateChange(clock.millis(), it.server) + } + } + .collect() + } + } + + scheduler.events + .takeWhile { + when (it) { + is VirtProvisioningEvent.MetricsAvailable -> + it.inactiveVmCount + it.failedVmCount != submitted + } + } + .collect() + delay(1) + } finally { + reader.close() + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt new file mode 100644 index 00000000..e1cf8517 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.harness.dsl.anyOf + +/** + * A [Portfolio] that explores the difference between horizontal and vertical scaling. + */ +public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") { + override val topology: Topology by anyOf( + Topology("base"), + Topology("rep-vol-hor-hom"), + Topology("rep-vol-hor-het"), + Topology("rep-vol-ver-hom"), + Topology("rep-vol-ver-het"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-hor-het"), + Topology("exp-vol-ver-hom"), + Topology("exp-vol-ver-het") + ) + + override val workload: Workload by anyOf( + Workload("solvinity", 0.1), + Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena: OperationalPhenomena by anyOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicy: String by anyOf( + "active-servers" + ) +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt new file mode 100644 index 00000000..a995e467 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.SamplingStrategy +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.harness.dsl.anyOf + +/** + * A [Portfolio] to explore the effect of HPC workloads. + */ +public class MoreHpcPortfolio : Portfolio("more_hpc") { + override val topology: Topology by anyOf( + Topology("base"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-ver-hom"), + Topology("exp-vel-ver-hom") + ) + + override val workload: Workload by anyOf( + Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC), + Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC), + Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC), + Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC), + Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD), + Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD), + Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD) + ) + + override val operationalPhenomena: OperationalPhenomena by anyOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicy: String by anyOf( + "active-servers" + ) +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt new file mode 100644 index 00000000..49559e0e --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.harness.dsl.anyOf + +/** + * A [Portfolio] that explores the effect of adding more velocity to a cluster (e.g., faster machines). + */ +public class MoreVelocityPortfolio : Portfolio("more_velocity") { + override val topology: Topology by anyOf( + Topology("base"), + Topology("rep-vel-ver-hom"), + Topology("rep-vel-ver-het"), + Topology("exp-vel-ver-hom"), + Topology("exp-vel-ver-het") + ) + + override val workload: Workload by anyOf( + Workload("solvinity", 0.1), + Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena: OperationalPhenomena by anyOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicy: String by anyOf( + "active-servers" + ) +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt new file mode 100644 index 00000000..1aac4f9e --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.harness.dsl.anyOf + +/** + * A [Portfolio] that explores the effect of operational phenomena on metrics. + */ +public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena") { + override val topology: Topology by anyOf( + Topology("base") + ) + + override val workload: Workload by anyOf( + Workload("solvinity", 0.1), + Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena: OperationalPhenomena by anyOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), + OperationalPhenomena(failureFrequency = 0.0, hasInterference = true), + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false), + OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) + ) + + override val allocationPolicy: String by anyOf( + "mem", + "mem-inv", + "core-mem", + "core-mem-inv", + "active-servers", + "active-servers-inv", + "random" + ) +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt new file mode 100644 index 00000000..75b0d735 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -0,0 +1,221 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope +import mu.KotlinLogging +import org.opendc.compute.simulator.allocation.* +import org.opendc.experiments.capelin.experiment.attachMonitor +import org.opendc.experiments.capelin.experiment.createFailureDomain +import org.opendc.experiments.capelin.experiment.createProvisioner +import org.opendc.experiments.capelin.experiment.processTrace +import org.opendc.experiments.capelin.model.CompositeWorkload +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor +import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader +import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader +import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import org.opendc.format.trace.PerformanceInterferenceModelReader +import org.opendc.harness.dsl.Experiment +import org.opendc.harness.dsl.anyOf +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.trace.core.EventTracer +import java.io.File +import java.util.concurrent.ConcurrentHashMap +import kotlin.random.Random + +/** + * A portfolio represents a collection of scenarios are tested for the work. + * + * @param name The name of the portfolio. + */ +public abstract class Portfolio(name: String) : Experiment(name) { + /** + * The logger for this portfolio instance. + */ + private val logger = KotlinLogging.logger {} + + /** + * The path to where the environments are located. + */ + private val environmentPath by anyOf(File("input/environments/")) + + /** + * The path to where the traces are located. + */ + private val tracePath by anyOf(File("input/traces/")) + + /** + * The path to where the output results should be written. + */ + private val outputPath by anyOf(File("output/")) + + /** + * The path to the original VM placements file. + */ + private val vmPlacements by anyOf(emptyMap()) + + /** + * The path to the performance interference model. + */ + private val performanceInterferenceModel by anyOf(null) + + /** + * The topology to test. + */ + public abstract val topology: Topology + + /** + * The workload to test. + */ + public abstract val workload: Workload + + /** + * The operational phenomenas to consider. + */ + public abstract val operationalPhenomena: OperationalPhenomena + + /** + * The allocation policies to consider. + */ + public abstract val allocationPolicy: String + + /** + * A map of trace readers. + */ + private val traceReaders = ConcurrentHashMap() + + /** + * Perform a single trial for this portfolio. + */ + @OptIn(ExperimentalCoroutinesApi::class) + override fun doRun(repeat: Int) { + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) + val tracer = EventTracer(clock) + val seeder = Random(repeat) + val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt")) + + val chan = Channel(Channel.CONFLATED) + val allocationPolicy = createAllocationPolicy(seeder) + + val workload = workload + val workloadNames = if (workload is CompositeWorkload) { + workload.workloads.map { it.name } + } else { + listOf(workload.name) + } + + val rawReaders = workloadNames.map { workloadName -> + traceReaders.computeIfAbsent(workloadName) { + logger.info { "Loading trace $workloadName" } + Sc20RawParquetTraceReader(File(tracePath, workloadName)) + } + } + + val performanceInterferenceModel = performanceInterferenceModel + ?.takeIf { operationalPhenomena.hasInterference } + ?.construct(seeder) ?: emptyMap() + val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt()) + + val monitor = ParquetExperimentMonitor( + outputPath, + "portfolio_id=$name/scenario_id=$id/run_id=$repeat", + 4096 + ) + + testScope.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner( + this, + clock, + environment, + allocationPolicy, + tracer + ) + + val failureDomain = if (operationalPhenomena.failureFrequency > 0) { + logger.debug("ENABLING failures") + createFailureDomain( + this, + clock, + seeder.nextInt(), + operationalPhenomena.failureFrequency, + bareMetalProvisioner, + chan + ) + } else { + null + } + + attachMonitor(this, clock, scheduler, monitor) + processTrace( + this, + clock, + trace, + scheduler, + chan, + monitor + ) + + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + try { + testScope.advanceUntilIdle() + } finally { + monitor.close() + } + } + + /** + * Create the [AllocationPolicy] instance to use for the trial. + */ + private fun createAllocationPolicy(seeder: Random): AllocationPolicy { + return when (allocationPolicy) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + "replay" -> ReplayAllocationPolicy(vmPlacements) + else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") + } + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt new file mode 100644 index 00000000..b6d3b30c --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.harness.dsl.anyOf + +/** + * A [Portfolio] that compares the original VM placements against our policies. + */ +public class ReplayPortfolio : Portfolio("replay") { + override val topology: Topology by anyOf( + Topology("base") + ) + + override val workload: Workload by anyOf( + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena: OperationalPhenomena by anyOf( + OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) + ) + + override val allocationPolicy: String by anyOf( + "replay", + "active-servers" + ) +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt new file mode 100644 index 00000000..90840db8 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.harness.dsl.anyOf + +/** + * A [Portfolio] to perform a simple test run. + */ +public class TestPortfolio : Portfolio("test") { + override val topology: Topology by anyOf( + Topology("base") + ) + + override val workload: Workload by anyOf( + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena: OperationalPhenomena by anyOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicy: String by anyOf("active-servers") +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt new file mode 100644 index 00000000..b53b3617 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.model + +/** + * Operation phenomena during experiments. + * + * @param failureFrequency The average time between failures in hours. + * @param hasInterference A flag to enable performance interference between VMs. + */ +public data class OperationalPhenomena(val failureFrequency: Double, val hasInterference: Boolean) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt new file mode 100644 index 00000000..fe16a294 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.model + +/** + * The topology topology on which we test the workload. + */ +public data class Topology(val name: String) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt new file mode 100644 index 00000000..c4ddd158 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.model + +public enum class SamplingStrategy { + REGULAR, + HPC, + HPC_LOAD +} + +/** + * A workload that is considered for a scenario. + */ +public open class Workload( + public open val name: String, + public val fraction: Double, + public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR +) + +/** + * A workload that is composed of multiple workloads. + */ +public class CompositeWorkload(override val name: String, public val workloads: List, public val totalLoad: Double) : + Workload(name, -1.0) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt new file mode 100644 index 00000000..3c6637bf --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.monitor + +import org.opendc.compute.core.Server +import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import java.io.Closeable + +/** + * A monitor watches the events of an experiment. + */ +public interface ExperimentMonitor : Closeable { + /** + * This method is invoked when the state of a VM changes. + */ + public fun reportVmStateChange(time: Long, server: Server) {} + + /** + * This method is invoked when the state of a host changes. + */ + public fun reportHostStateChange( + time: Long, + driver: VirtDriver, + server: Server + ) { + } + + /** + * Report the power consumption of a host. + */ + public fun reportPowerConsumption(host: Server, draw: Double) {} + + /** + * This method is invoked for a host for each slice that is finishes. + */ + public fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + duration: Long = 5 * 60 * 1000L + ) { + } + + /** + * This method is invoked for a provisioner event. + */ + public fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {} +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt new file mode 100644 index 00000000..a0d57656 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -0,0 +1,202 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.monitor + +import mu.KotlinLogging +import org.opendc.compute.core.Server +import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.experiments.capelin.telemetry.HostEvent +import org.opendc.experiments.capelin.telemetry.ProvisionerEvent +import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter +import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter +import java.io.File + +/** + * The logger instance to use. + */ +private val logger = KotlinLogging.logger {} + +/** + * An [ExperimentMonitor] that logs the events to a Parquet file. + */ +public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor { + private val hostWriter = ParquetHostEventWriter( + File(base, "host-metrics/$partition/data.parquet"), + bufferSize + ) + private val provisionerWriter = ParquetProvisionerEventWriter( + File(base, "provisioner-metrics/$partition/data.parquet"), + bufferSize + ) + private val currentHostEvent = mutableMapOf() + private var startTime = -1L + + override fun reportVmStateChange(time: Long, server: Server) { + if (startTime < 0) { + startTime = time + + // Update timestamp of initial event + currentHostEvent.replaceAll { _, v -> v.copy(timestamp = startTime) } + } + } + + override fun reportHostStateChange( + time: Long, + driver: VirtDriver, + server: Server + ) { + logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } + + val previousEvent = currentHostEvent[server] + + val roundedTime = previousEvent?.let { + val duration = time - it.timestamp + val k = 5 * 60 * 1000L // 5 min in ms + val rem = duration % k + + if (rem == 0L) { + time + } else { + it.timestamp + duration + k - rem + } + } ?: time + + reportHostSlice( + roundedTime, + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server + ) + } + + private val lastPowerConsumption = mutableMapOf() + + override fun reportPowerConsumption(host: Server, draw: Double) { + lastPowerConsumption[host] = draw + } + + override fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + duration: Long + ) { + val previousEvent = currentHostEvent[hostServer] + when { + previousEvent == null -> { + val event = HostEvent( + time, + 5 * 60 * 1000L, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + previousEvent.timestamp == time -> { + val event = HostEvent( + time, + previousEvent.duration, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + else -> { + hostWriter.write(previousEvent) + + val event = HostEvent( + time, + time - previousEvent.timestamp, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + } + } + + override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + provisionerWriter.write( + ProvisionerEvent( + time, + event.totalHostCount, + event.availableHostCount, + event.totalVmCount, + event.activeVmCount, + event.inactiveVmCount, + event.waitingVmCount, + event.failedVmCount + ) + ) + } + + override fun close() { + // Flush remaining events + for ((_, event) in currentHostEvent) { + hostWriter.write(event) + } + currentHostEvent.clear() + + hostWriter.close() + provisionerWriter.close() + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt new file mode 100644 index 00000000..c29e116e --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry + +/** + * An event that occurs within the system. + */ +public abstract class Event(public val name: String) { + /** + * The time of occurrence of this event. + */ + public abstract val timestamp: Long +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt new file mode 100644 index 00000000..e5e9d520 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry + +import org.opendc.compute.core.Server + +/** + * A periodic report of the host machine metrics. + */ +public data class HostEvent( + override val timestamp: Long, + public val duration: Long, + public val host: Server, + public val vmCount: Int, + public val requestedBurst: Long, + public val grantedBurst: Long, + public val overcommissionedBurst: Long, + public val interferedBurst: Long, + public val cpuUsage: Double, + public val cpuDemand: Double, + public val powerDraw: Double, + public val cores: Int +) : Event("host-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt new file mode 100644 index 00000000..539c9bc9 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry + +/** + * A periodic report of the provisioner's metrics. + */ +public data class ProvisionerEvent( + override val timestamp: Long, + public val totalHostCount: Int, + public val availableHostCount: Int, + public val totalVmCount: Int, + public val activeVmCount: Int, + public val inactiveVmCount: Int, + public val waitingVmCount: Int, + public val failedVmCount: Int +) : Event("provisioner-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt new file mode 100644 index 00000000..6c8fc941 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry + +import org.opendc.experiments.capelin.Portfolio + +/** + * A periodic report of the host machine metrics. + */ +public data class RunEvent( + val portfolio: Portfolio, + val repeat: Int, + override val timestamp: Long +) : Event("run") diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt new file mode 100644 index 00000000..427c453a --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry + +import org.opendc.compute.core.Server + +/** + * A periodic report of a virtual machine's metrics. + */ +public data class VmEvent( + override val timestamp: Long, + public val duration: Long, + public val vm: Server, + public val host: Server, + public val requestedBurst: Long, + public val grantedBurst: Long, + public val overcommissionedBurst: Long, + public val interferedBurst: Long, + public val cpuUsage: Double, + public val cpuDemand: Double +) : Event("vm-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt new file mode 100644 index 00000000..38930ee5 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry.parquet + +import mu.KotlinLogging +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.experiments.capelin.telemetry.Event +import java.io.Closeable +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread + +/** + * The logging instance to use. + */ +private val logger = KotlinLogging.logger {} + +/** + * A writer that writes events in Parquet format. + */ +public open class ParquetEventWriter( + private val path: File, + private val schema: Schema, + private val converter: (T, GenericData.Record) -> Unit, + private val bufferSize: Int = 4096 +) : Runnable, Closeable { + /** + * The writer to write the Parquet file. + */ + private val writer = AvroParquetWriter.builder(Path(path.absolutePath)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + /** + * The queue of commands to process. + */ + private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) + + /** + * The thread that is responsible for writing the Parquet records. + */ + private val writerThread = thread(start = false, name = "parquet-writer") { run() } + + /** + * Write the specified metrics to the database. + */ + public fun write(event: T) { + queue.put(Action.Write(event)) + } + + /** + * Signal the writer to stop. + */ + public override fun close() { + queue.put(Action.Stop) + writerThread.join() + } + + init { + writerThread.start() + } + + /** + * Start the writer thread. + */ + override fun run() { + try { + loop@ while (true) { + val action = queue.take() + when (action) { + is Action.Stop -> break@loop + is Action.Write<*> -> { + val record = GenericData.Record(schema) + @Suppress("UNCHECKED_CAST") + converter(action.event as T, record) + writer.write(record) + } + } + } + } catch (e: Throwable) { + logger.error("Writer failed", e) + } finally { + writer.close() + } + } + + public sealed class Action { + /** + * A poison pill that will stop the writer thread. + */ + public object Stop : Action() + + /** + * Write the specified metrics to the database. + */ + public data class Write(val event: T) : Action() + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt new file mode 100644 index 00000000..4a3e7963 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.opendc.experiments.capelin.telemetry.HostEvent +import java.io.File + +/** + * A Parquet event writer for [HostEvent]s. + */ +public class ParquetHostEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter(path, schema, convert, bufferSize) { + + override fun toString(): String = "host-writer" + + public companion object { + private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record -> + // record.put("portfolio_id", event.run.parent.parent.id) + // record.put("scenario_id", event.run.parent.id) + // record.put("run_id", event.run.id) + record.put("host_id", event.host.name) + record.put("state", event.host.state.name) + record.put("timestamp", event.timestamp) + record.put("duration", event.duration) + record.put("vm_count", event.vmCount) + record.put("requested_burst", event.requestedBurst) + record.put("granted_burst", event.grantedBurst) + record.put("overcommissioned_burst", event.overcommissionedBurst) + record.put("interfered_burst", event.interferedBurst) + record.put("cpu_usage", event.cpuUsage) + record.put("cpu_demand", event.cpuDemand) + record.put("power_draw", event.powerDraw * (1.0 / 12)) + record.put("cores", event.cores) + } + + private val schema: Schema = SchemaBuilder + .record("host_metrics") + .namespace("org.opendc.experiments.sc20") + .fields() + // .name("portfolio_id").type().intType().noDefault() + // .name("scenario_id").type().intType().noDefault() + // .name("run_id").type().intType().noDefault() + .name("timestamp").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("host_id").type().stringType().noDefault() + .name("state").type().stringType().noDefault() + .name("vm_count").type().intType().noDefault() + .name("requested_burst").type().longType().noDefault() + .name("granted_burst").type().longType().noDefault() + .name("overcommissioned_burst").type().longType().noDefault() + .name("interfered_burst").type().longType().noDefault() + .name("cpu_usage").type().doubleType().noDefault() + .name("cpu_demand").type().doubleType().noDefault() + .name("power_draw").type().doubleType().noDefault() + .name("cores").type().intType().noDefault() + .endRecord() + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt new file mode 100644 index 00000000..8feff8d9 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.opendc.experiments.capelin.telemetry.ProvisionerEvent +import java.io.File + +/** + * A Parquet event writer for [ProvisionerEvent]s. + */ +public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter(path, schema, convert, bufferSize) { + + override fun toString(): String = "provisioner-writer" + + public companion object { + private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record -> + record.put("timestamp", event.timestamp) + record.put("host_total_count", event.totalHostCount) + record.put("host_available_count", event.availableHostCount) + record.put("vm_total_count", event.totalVmCount) + record.put("vm_active_count", event.activeVmCount) + record.put("vm_inactive_count", event.inactiveVmCount) + record.put("vm_waiting_count", event.waitingVmCount) + record.put("vm_failed_count", event.failedVmCount) + } + + private val schema: Schema = SchemaBuilder + .record("provisioner_metrics") + .namespace("org.opendc.experiments.sc20") + .fields() + .name("timestamp").type().longType().noDefault() + .name("host_total_count").type().intType().noDefault() + .name("host_available_count").type().intType().noDefault() + .name("vm_total_count").type().intType().noDefault() + .name("vm_active_count").type().intType().noDefault() + .name("vm_inactive_count").type().intType().noDefault() + .name("vm_waiting_count").type().intType().noDefault() + .name("vm_failed_count").type().intType().noDefault() + .endRecord() + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt new file mode 100644 index 00000000..946410eb --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.opendc.experiments.capelin.telemetry.RunEvent +import java.io.File + +/** + * A Parquet event writer for [RunEvent]s. + */ +public class ParquetRunEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter(path, schema, convert, bufferSize) { + + override fun toString(): String = "run-writer" + + public companion object { + private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record -> + val portfolio = event.portfolio + record.put("portfolio_name", portfolio.name) + record.put("scenario_id", portfolio.id) + record.put("run_id", event.repeat) + record.put("topology", portfolio.topology.name) + record.put("workload_name", portfolio.workload.name) + record.put("workload_fraction", portfolio.workload.fraction) + record.put("workload_sampler", portfolio.workload.samplingStrategy) + record.put("allocation_policy", portfolio.allocationPolicy) + record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency) + record.put("interference", portfolio.operationalPhenomena.hasInterference) + record.put("seed", event.repeat) + } + + private val schema: Schema = SchemaBuilder + .record("runs") + .namespace("org.opendc.experiments.sc20") + .fields() + .name("portfolio_name").type().stringType().noDefault() + .name("scenario_id").type().intType().noDefault() + .name("run_id").type().intType().noDefault() + .name("topology").type().stringType().noDefault() + .name("workload_name").type().stringType().noDefault() + .name("workload_fraction").type().doubleType().noDefault() + .name("workload_sampler").type().stringType().noDefault() + .name("allocation_policy").type().stringType().noDefault() + .name("failure_frequency").type().doubleType().noDefault() + .name("interference").type().booleanType().noDefault() + .name("seed").type().intType().noDefault() + .endRecord() + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt new file mode 100644 index 00000000..6cfdae40 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.experiments.capelin.model.CompositeWorkload +import org.opendc.experiments.capelin.model.Workload +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import java.util.TreeSet + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param reader The internal trace reader to use. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + * @param run The run to which this reader belongs. + */ +@OptIn(ExperimentalStdlibApi::class) +public class Sc20ParquetTraceReader( + rawReaders: List, + performanceInterferenceModel: Map, + workload: Workload, + seed: Int +) : TraceReader { + /** + * The iterator over the actual trace. + */ + private val iterator: Iterator> = + rawReaders + .map { it.read() } + .run { + if (workload is CompositeWorkload) { + this.zip(workload.workloads) + } else { + this.zip(listOf(workload)) + } + } + .map { sampleWorkload(it.first, workload, it.second, seed) } + .flatten() + .run { + // Apply performance interference model + if (performanceInterferenceModel.isEmpty()) + this + else { + map { entry -> + val image = entry.workload.image + val id = image.name + val relevantPerformanceInterferenceModelItems = + performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) + + val newImage = + SimWorkloadImage( + image.uid, + image.name, + image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + (image as SimWorkloadImage).workload + ) + val newWorkload = entry.workload.copy(image = newImage) + Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) + } + } + } + .iterator() + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt new file mode 100644 index 00000000..d2560d62 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import mu.KotlinLogging +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader +import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.core.User +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimTraceWorkload +import java.io.File +import java.util.UUID + +private val logger = KotlinLogging.logger {} + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param path The directory of the traces. + */ +@OptIn(ExperimentalStdlibApi::class) +public class Sc20RawParquetTraceReader(private val path: File) { + /** + * Read the fragments into memory. + */ + private fun parseFragments(path: File): Map> { + val reader = AvroParquetReader.builder(Path(path.absolutePath, "trace.parquet")) + .disableCompatibility() + .build() + + val fragments = mutableMapOf>() + + return try { + while (true) { + val record = reader.read() ?: break + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = SimTraceWorkload.Fragment( + duration, + cpuUsage, + cores + ) + + fragments.getOrPut(id) { mutableListOf() }.add(fragment) + } + + fragments + } finally { + reader.close() + } + } + + /** + * Read the metadata into a workload. + */ + private fun parseMeta(path: File, fragments: Map>): List { + val metaReader = AvroParquetReader.builder(Path(path.absolutePath, "meta.parquet")) + .disableCompatibility() + .build() + + var counter = 0 + val entries = mutableListOf() + + return try { + while (true) { + val record = metaReader.read() ?: break + + val id = record["id"].toString() + if (!fragments.containsKey(id)) { + continue + } + + val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) + + val vmFragments = fragments.getValue(id).asSequence() + val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs + val vmWorkload = VmWorkload( + uid, + id, + UnnamedUser, + SimWorkloadImage( + uid, + id, + mapOf( + "submit-time" to submissionTime, + "end-time" to endTime, + "total-load" to totalLoad, + "cores" to maxCores, + "required-memory" to requiredMemory + ), + SimTraceWorkload(vmFragments) + ) + ) + entries.add(TraceEntryImpl(submissionTime, vmWorkload)) + } + + entries + } catch (e: Exception) { + e.printStackTrace() + throw e + } finally { + metaReader.close() + } + } + + /** + * The entries in the trace. + */ + private val entries: List + + init { + val fragments = parseFragments(path) + entries = parseMeta(path, fragments) + } + + /** + * Read the entries in the trace. + */ + public fun read(): List> = entries + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + internal data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt new file mode 100644 index 00000000..12705c80 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt @@ -0,0 +1,305 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import mu.KotlinLogging +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.filter2.predicate.Statistics +import org.apache.parquet.filter2.predicate.UserDefinedPredicate +import org.apache.parquet.io.api.Binary +import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.core.User +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.workload.SimTraceWorkload +import java.io.File +import java.io.Serializable +import java.util.SortedSet +import java.util.TreeSet +import java.util.UUID +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread +import kotlin.random.Random + +private val logger = KotlinLogging.logger {} + +/** + * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. + * + * @param traceFile The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +@OptIn(ExperimentalStdlibApi::class) +public class Sc20StreamingParquetTraceReader( + traceFile: File, + performanceInterferenceModel: PerformanceInterferenceModel, + selectedVms: List, + random: Random +) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * The intermediate buffer to store the read records in. + */ + private val queue = ArrayBlockingQueue>(1024) + + /** + * An optional filter for filtering the selected VMs + */ + private val filter = + if (selectedVms.isEmpty()) + null + else + FilterCompat.get( + FilterApi.userDefined( + FilterApi.binaryColumn("id"), + SelectedVmFilter( + TreeSet(selectedVms) + ) + ) + ) + + /** + * A poisonous fragment. + */ + private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0)) + + /** + * The thread to read the records in. + */ + private val readerThread = thread(start = true, name = "sc20-reader") { + val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + try { + while (true) { + val record = reader.read() + + if (record == null) { + queue.put(poison) + break + } + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = SimTraceWorkload.Fragment( + duration, + cpuUsage, + cores + ) + + queue.put(id to fragment) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + reader.close() + } + } + + /** + * Fill the buffers with the VMs + */ + private fun pull(buffers: Map>>) { + if (!hasNext) { + return + } + + val fragments = mutableListOf>() + queue.drainTo(fragments) + + for ((id, fragment) in fragments) { + if (id == poison.first) { + hasNext = false + return + } + buffers[id]?.forEach { it.add(fragment) } + } + } + + /** + * A flag to indicate whether the reader has more entries. + */ + private var hasNext: Boolean = true + + /** + * Initialize the reader. + */ + init { + val takenIds = mutableSetOf() + val entries = mutableMapOf() + val buffers = mutableMapOf>>() + + val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + while (true) { + val record = metaReader.read() ?: break + val id = record["id"].toString() + entries[id] = record + } + + metaReader.close() + + val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms + + // Create the entry iterator + iterator = selection.asSequence() + .mapNotNull { entries[it] } + .mapIndexed { index, record -> + val id = record["id"].toString() + val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) + + assert(uid !in takenIds) + takenIds += uid + + logger.info("Processing VM $id") + + val internalBuffer = mutableListOf() + val externalBuffer = mutableListOf() + buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) + val fragments = sequence { + var time = submissionTime + repeat@ while (true) { + if (externalBuffer.isEmpty()) { + if (hasNext) { + pull(buffers) + continue + } else { + break + } + } + + internalBuffer.addAll(externalBuffer) + externalBuffer.clear() + + for (fragment in internalBuffer) { + yield(fragment) + + time += fragment.duration + if (time >= endTime) { + break@repeat + } + } + + internalBuffer.clear() + } + + buffers.remove(id) + } + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), + Random(random.nextInt()) + ) + val vmWorkload = VmWorkload( + uid, + "VM Workload $id", + UnnamedUser, + SimWorkloadImage( + uid, + id, + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to maxCores, + "required-memory" to requiredMemory + ), + SimTraceWorkload(fragments), + ) + ) + + TraceEntryImpl( + submissionTime, + vmWorkload + ) + } + .sortedBy { it.submissionTime } + .toList() + .iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() { + readerThread.interrupt() + } + + private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { + override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) + + override fun canDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() + } + + override fun inverseCanDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() + } + } + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt new file mode 100644 index 00000000..7713c06f --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt @@ -0,0 +1,621 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.arguments.argument +import com.github.ajalt.clikt.parameters.groups.OptionGroup +import com.github.ajalt.clikt.parameters.groups.groupChoice +import com.github.ajalt.clikt.parameters.options.convert +import com.github.ajalt.clikt.parameters.options.default +import com.github.ajalt.clikt.parameters.options.defaultLazy +import com.github.ajalt.clikt.parameters.options.option +import com.github.ajalt.clikt.parameters.options.required +import com.github.ajalt.clikt.parameters.options.split +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.long +import me.tongfei.progressbar.ProgressBar +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.format.trace.sc20.Sc20VmPlacementReader +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.Random +import kotlin.math.max +import kotlin.math.min + +/** + * Represents the command for converting traces + */ +public class TraceConverterCli : CliktCommand(name = "trace-converter") { + /** + * The directory where the trace should be stored. + */ + private val outputPath by option("-O", "--output", help = "path to store the trace") + .file(canBeFile = false, mustExist = false) + .defaultLazy { File("output") } + + /** + * The directory where the input trace is located. + */ + private val inputPath by argument("input", help = "path to the input trace") + .file(canBeFile = false) + + /** + * The input type of the trace. + */ + private val type by option("-t", "--type", help = "input type of trace").groupChoice( + "solvinity" to SolvinityConversion(), + "bitbrains" to BitbrainsConversion(), + "azure" to AzureConversion() + ) + + override fun run() { + val metaSchema = SchemaBuilder + .record("meta") + .namespace("org.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("submissionTime").type().longType().noDefault() + .name("endTime").type().longType().noDefault() + .name("maxCores").type().intType().noDefault() + .name("requiredMemory").type().longType().noDefault() + .endRecord() + val schema = SchemaBuilder + .record("trace") + .namespace("org.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("cores").type().intType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("flops").type().longType().noDefault() + .endRecord() + + val metaParquet = File(outputPath, "meta.parquet") + val traceParquet = File(outputPath, "trace.parquet") + + if (metaParquet.exists()) { + metaParquet.delete() + } + if (traceParquet.exists()) { + traceParquet.delete() + } + + val metaWriter = AvroParquetWriter.builder(Path(metaParquet.toURI())) + .withSchema(metaSchema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + val writer = AvroParquetWriter.builder(Path(traceParquet.toURI())) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + try { + val type = type ?: throw IllegalArgumentException("Invalid trace conversion") + val allFragments = type.read(inputPath, metaSchema, metaWriter) + allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) + + for (fragment in allFragments) { + val record = GenericData.Record(schema) + record.put("id", fragment.id) + record.put("time", fragment.tick) + record.put("duration", fragment.duration) + record.put("cores", fragment.cores) + record.put("cpuUsage", fragment.usage) + record.put("flops", fragment.flops) + + writer.write(record) + } + } finally { + writer.close() + metaWriter.close() + } + } +} + +/** + * The supported trace conversions. + */ +public sealed class TraceConversion(name: String) : OptionGroup(name) { + /** + * Read the fragments of the trace. + */ + public abstract fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList +} + +public class SolvinityConversion : TraceConversion("Solvinity") { + private val clusters by option() + .split(",") + + private val vmPlacements by option("--vm-placements", help = "file containing the VM placements") + .file(canBeDir = false) + .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } } + .required() + + override fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList { + val clusters = clusters?.toSet() ?: emptySet() + val timestampCol = 0 + val cpuUsageCol = 1 + val coreCol = 12 + val provisionedMemoryCol = 20 + val traceInterval = 5 * 60 * 1000L + + // Identify start time of the entire trace + var minTimestamp = Long.MAX_VALUE + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEach file@{ vmFile -> + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val vmId = vmFile.name + + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null || !clusters.contains(clusterName)) { + continue + } + + val values = line.split("\t") + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + + if (timestamp < minTimestamp) { + minTimestamp = timestamp + } + return@file + } + } + } + } + + println("Start of trace at $minTimestamp") + + val allFragments = mutableListOf() + + val begin = 15 * 24 * 60 * 60 * 1000L + val end = 45 * 24 * 60 * 60 * 1000L + + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEach { vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var cores: Int + var minTime = Long.MAX_VALUE + + val flopsFragments = sequence { + var last: Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split("\t") + + vmId = vmFile.name + + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null || !clusters.contains(clusterName)) { + continue + } + + val timestamp = + (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp + if (begin > timestamp || timestamp > end) { + continue + } + + cores = values[coreCol].trim().toInt() + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! + Fragment( + vmId, + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + cores + ) + if (last != null) { + yield(last!!) + } + fragment + } + } + } + } + + if (last != null) { + yield(last!!) + } + } + + var maxTime = Long.MIN_VALUE + flopsFragments.filter { it.tick in begin until end }.forEach { fragment -> + allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) + } + + if (minTime in begin until end) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + } + + return allFragments + } +} + +/** + * Conversion of the Bitbrains public trace. + */ +public class BitbrainsConversion : TraceConversion("Bitbrains") { + override fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList { + val timestampCol = 0 + val cpuUsageCol = 3 + val coreCol = 1 + val provisionedMemoryCol = 5 + val traceInterval = 5 * 60 * 1000L + + val allFragments = mutableListOf() + + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEach { vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var cores: Int + var minTime = Long.MAX_VALUE + + val flopsFragments = sequence { + var last: Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .drop(1) + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(";\t") + + vmId = vmFile.name + + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + + cores = values[coreCol].trim().toInt() + val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB + requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! + Fragment( + vmId, + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + cores + ) + if (last != null) { + yield(last!!) + } + fragment + } + } + } + } + + if (last != null) { + yield(last!!) + } + } + + var maxTime = Long.MIN_VALUE + flopsFragments.forEach { fragment -> + allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) + } + + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + + return allFragments + } +} + +/** + * Conversion of the Azure public VM trace. + */ +public class AzureConversion : TraceConversion("Azure") { + private val seed by option(help = "seed for trace sampling") + .long() + .default(0) + + override fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList { + val random = Random(seed) + val fraction = 0.01 + + // Read VM table + val vmIdTableCol = 0 + val coreTableCol = 9 + val provisionedMemoryTableCol = 10 + + var vmId: String + var cores: Int + var requiredMemory: Long + + val vmIds = mutableSetOf() + val vmIdToMetadata = mutableMapOf() + + BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader -> + reader.lineSequence() + .chunked(1024) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + // Sample only a fraction of the VMs + if (random.nextDouble() > fraction) { + continue + } + + val values = line.split(",") + + // Exclude VMs with a large number of cores (not specified exactly) + if (values[coreTableCol].contains(">")) { + continue + } + + vmId = values[vmIdTableCol].trim() + cores = values[coreTableCol].trim().toInt() + requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB + + vmIds.add(vmId) + vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L) + } + } + } + + // Read VM metric reading files + val timestampCol = 0 + val vmIdCol = 1 + val cpuUsageCol = 4 + val traceInterval = 5 * 60 * 1000L + + val vmIdToFragments = mutableMapOf>() + val vmIdToLastFragment = mutableMapOf() + val allFragments = mutableListOf() + + for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) { + val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") + var timestamp: Long + var cpuUsage: Double + + BufferedReader(FileReader(readingsFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(",") + vmId = values[vmIdCol].trim() + + // Ignore readings for VMs not in the sample + if (!vmIds.contains(vmId)) { + continue + } + + timestamp = values[timestampCol].trim().toLong() * 1000L + vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) + cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz + vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + val lastFragment = vmIdToLastFragment[vmId] + + vmIdToLastFragment[vmId] = + if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { + Fragment( + vmId, + lastFragment.tick, + lastFragment.flops + flops, + lastFragment.duration + traceInterval, + cpuUsage, + vmIdToMetadata[vmId]!!.cores + ) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + vmIdToMetadata[vmId]!!.cores + ) + if (lastFragment != null) { + if (vmIdToFragments[vmId] == null) { + vmIdToFragments[vmId] = mutableListOf() + } + vmIdToFragments[vmId]!!.add(lastFragment) + allFragments.add(lastFragment) + } + fragment + } + } + } + } + } + + for (entry in vmIdToLastFragment) { + if (entry.value != null) { + if (vmIdToFragments[entry.key] == null) { + vmIdToFragments[entry.key] = mutableListOf() + } + vmIdToFragments[entry.key]!!.add(entry.value!!) + } + } + + println("Read ${vmIdToLastFragment.size} VMs") + + for (entry in vmIdToMetadata) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", entry.key) + metaRecord.put("submissionTime", entry.value.minTime) + metaRecord.put("endTime", entry.value.maxTime) + println("${entry.value.minTime} - ${entry.value.maxTime}") + metaRecord.put("maxCores", entry.value.cores) + metaRecord.put("requiredMemory", entry.value.requiredMemory) + metaWriter.write(metaRecord) + } + + return allFragments + } +} + +public data class Fragment( + public val id: String, + public val tick: Long, + public val flops: Long, + public val duration: Long, + public val usage: Double, + public val cores: Int +) + +public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long) + +/** + * A script to convert a trace in text format into a Parquet trace. + */ +public fun main(args: Array): Unit = TraceConverterCli().main(args) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt new file mode 100644 index 00000000..4d9b9df1 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import mu.KotlinLogging +import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.experiments.capelin.model.CompositeWorkload +import org.opendc.experiments.capelin.model.SamplingStrategy +import org.opendc.experiments.capelin.model.Workload +import org.opendc.format.trace.TraceEntry +import java.util.* +import kotlin.random.Random + +private val logger = KotlinLogging.logger {} + +/** + * Sample the workload for the specified [run]. + */ +public fun sampleWorkload( + trace: List>, + workload: Workload, + subWorkload: Workload, + seed: Int +): List> { + return when { + workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) + workload.samplingStrategy == SamplingStrategy.HPC -> + sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false) + workload.samplingStrategy == SamplingStrategy.HPC_LOAD -> + sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true) + else -> + sampleRegularWorkload(trace, workload, workload, seed) + } +} + +/** + * Sample a regular (non-HPC) workload. + */ +public fun sampleRegularWorkload( + trace: List>, + workload: Workload, + subWorkload: Workload, + seed: Int +): List> { + val fraction = subWorkload.fraction + + val shuffled = trace.shuffled(Random(seed)) + val res = mutableListOf>() + val totalLoad = if (workload is CompositeWorkload) { + workload.totalLoad + } else { + shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + } + var currentLoad = 0.0 + + for (entry in shuffled) { + val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + + logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res +} + +/** + * Sample a HPC workload. + */ +public fun sampleHpcWorkload( + trace: List>, + workload: Workload, + seed: Int, + sampleOnLoad: Boolean +): List> { + val pattern = Regex("^vm__workload__(ComputeNode|cn).*") + val random = Random(seed) + + val fraction = workload.fraction + val (hpc, nonHpc) = trace.partition { entry -> + val name = entry.workload.image.name + name.matches(pattern) + } + + val hpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf>() + hpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + val nonHpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf>() + nonHpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } + + val totalLoad = if (workload is CompositeWorkload) { + workload.totalLoad + } else { + trace.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + } + + logger.debug { "Total trace load: $totalLoad" } + var hpcCount = 0 + var hpcLoad = 0.0 + var nonHpcCount = 0 + var nonHpcLoad = 0.0 + + val res = mutableListOf>() + + if (sampleOnLoad) { + var currentLoad = 0.0 + for (entry in hpcSequence) { + val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + hpcLoad += entryLoad + hpcCount += 1 + currentLoad += entryLoad + res += entry + } + + for (entry in nonHpcSequence) { + val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + if ((currentLoad + entryLoad) / totalLoad > 1) { + break + } + + nonHpcLoad += entryLoad + nonHpcCount += 1 + currentLoad += entryLoad + res += entry + } + } else { + hpcSequence + .take((fraction * trace.size).toInt()) + .forEach { entry -> + hpcLoad += entry.workload.image.tags.getValue("total-load") as Double + hpcCount += 1 + res.add(entry) + } + + nonHpcSequence + .take(((1 - fraction) * trace.size).toInt()) + .forEach { entry -> + nonHpcLoad += entry.workload.image.tags.getValue("total-load") as Double + nonHpcCount += 1 + res.add(entry) + } + } + + logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } + logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } + logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res +} + +/** + * Sample a random trace entry. + */ +private fun sample(entry: TraceEntry, i: Int): TraceEntry { + val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray()) + val image = SimWorkloadImage( + id, + entry.workload.image.name, + entry.workload.image.tags, + (entry.workload.image as SimWorkloadImage).workload + ) + val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name) + return VmTraceEntry(vmWorkload, entry.submissionTime) +} + +private class VmTraceEntry(override val workload: VmWorkload, override val submissionTime: Long) : + TraceEntry diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml new file mode 100644 index 00000000..d1c01b8e --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml @@ -0,0 +1,49 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt new file mode 100644 index 00000000..6a0796f6 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -0,0 +1,256 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.compute.core.Server +import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy +import org.opendc.experiments.capelin.experiment.attachMonitor +import org.opendc.experiments.capelin.experiment.createFailureDomain +import org.opendc.experiments.capelin.experiment.createProvisioner +import org.opendc.experiments.capelin.experiment.processTrace +import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.capelin.monitor.ExperimentMonitor +import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader +import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader +import org.opendc.format.environment.EnvironmentReader +import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.trace.core.EventTracer +import java.io.File +import java.time.Clock + +/** + * An integration test suite for the SC20 experiments. + */ +@OptIn(ExperimentalCoroutinesApi::class) +class CapelinIntegrationTest { + /** + * The [TestCoroutineScope] to use. + */ + private lateinit var testScope: TestCoroutineScope + + /** + * The simulation clock to use. + */ + private lateinit var clock: Clock + + /** + * The monitor used to keep track of the metrics. + */ + private lateinit var monitor: TestExperimentReporter + + /** + * Setup the experimental environment. + */ + @BeforeEach + fun setUp() { + testScope = TestCoroutineScope() + clock = DelayControllerClockAdapter(testScope) + + monitor = TestExperimentReporter() + } + + /** + * Tear down the experimental environment. + */ + @AfterEach + fun tearDown() = testScope.cleanupTestCoroutines() + + @Test + fun testLarge() { + val failures = false + val seed = 0 + val chan = Channel(Channel.CONFLATED) + val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val traceReader = createTestTraceReader() + val environmentReader = createTestEnvironmentReader() + lateinit var scheduler: SimVirtProvisioningService + val tracer = EventTracer(clock) + + testScope.launch { + val res = createProvisioner( + this, + clock, + environmentReader, + allocationPolicy, + tracer + ) + val bareMetalProvisioner = res.first + scheduler = res.second + + val failureDomain = if (failures) { + println("ENABLING failures") + createFailureDomain( + this, + clock, + seed, + 24.0 * 7, + bareMetalProvisioner, + chan + ) + } else { + null + } + + attachMonitor(this, clock, scheduler, monitor) + processTrace( + this, + clock, + traceReader, + scheduler, + chan, + monitor + ) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + monitor.close() + } + + runSimulation() + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, + { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, + { assertEquals(1684849230562, monitor.totalRequestedBurst) }, + { assertEquals(447612683996, monitor.totalGrantedBurst) }, + { assertEquals(1219535757406, monitor.totalOvercommissionedBurst) }, + { assertEquals(0, monitor.totalInterferedBurst) } + ) + } + + @Test + fun testSmall() { + val seed = 1 + val chan = Channel(Channel.CONFLATED) + val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val traceReader = createTestTraceReader(0.5, seed) + val environmentReader = createTestEnvironmentReader("single") + lateinit var scheduler: SimVirtProvisioningService + val tracer = EventTracer(clock) + + testScope.launch { + val res = createProvisioner( + this, + clock, + environmentReader, + allocationPolicy, + tracer + ) + scheduler = res.second + + attachMonitor(this, clock, scheduler, monitor) + processTrace( + this, + clock, + traceReader, + scheduler, + chan, + monitor + ) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + scheduler.terminate() + monitor.close() + } + + runSimulation() + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(705128393966, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } + ) + } + + /** + * Run the simulation. + */ + private fun runSimulation() = testScope.advanceUntilIdle() + + /** + * Obtain the trace reader for the test. + */ + private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader { + return Sc20ParquetTraceReader( + listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), + emptyMap(), + Workload("test", fraction), + seed + ) + } + + /** + * Obtain the environment reader for the test. + */ + private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader { + val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt") + return Sc20ClusterEnvironmentReader(stream) + } + + class TestExperimentReporter : ExperimentMonitor { + var totalRequestedBurst = 0L + var totalGrantedBurst = 0L + var totalOvercommissionedBurst = 0L + var totalInterferedBurst = 0L + + override fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + duration: Long + ) { + totalRequestedBurst += requestedBurst + totalGrantedBurst += grantedBurst + totalOvercommissionedBurst += overcommissionedBurst + totalInterferedBurst += interferedBurst + } + + override fun close() {} + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt new file mode 100644 index 00000000..53b3c2d7 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt @@ -0,0 +1,3 @@ +ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost +A01;A01;8;3.2;64;1;64;8 + diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt new file mode 100644 index 00000000..6b347bff --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt @@ -0,0 +1,5 @@ +ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost +A01;A01;32;3.2;2048;1;256;32 +B01;B01;48;2.93;1256;6;64;8 +C01;C01;32;3.2;2048;2;128;16 + diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet new file mode 100644 index 00000000..ce7a812c Binary files /dev/null and b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet differ diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet new file mode 100644 index 00000000..1d7ce882 Binary files /dev/null and b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet differ diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore b/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore new file mode 100644 index 00000000..ba64707c --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore @@ -0,0 +1,2 @@ +input/ +output/ diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt index 3843f198..fc979363 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -55,12 +55,12 @@ public class UnderspecificationExperiment : Experiment("underspecification") { /** * The workflow traces to test. */ - private val trace: String by anyOf("traces/chronos_exp_noscaler_ca.gwf") + private val trace: String by anyOf("input/traces/chronos_exp_noscaler_ca.gwf") /** * The datacenter environments to test. */ - private val environment: String by anyOf("environments/base.json") + private val environment: String by anyOf("input/environments/base.json") @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int) { diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts deleted file mode 100644 index 8127d9eb..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2019 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Experiments for the SC20 paper" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` - `testing-conventions` - application -} - -application { - mainClass.set("org.opendc.harness.runner.console.ConsoleRunnerKt") - applicationDefaultJvmArgs = listOf("-Xms2500M") -} - -dependencies { - api(project(":opendc-core")) - api(project(":opendc-harness")) - implementation(project(":opendc-format")) - implementation(project(":opendc-simulator:opendc-simulator-core")) - implementation(project(":opendc-simulator:opendc-simulator-compute")) - implementation(project(":opendc-simulator:opendc-simulator-failures")) - implementation(project(":opendc-compute:opendc-compute-simulator")) - - implementation("io.github.microutils:kotlin-logging") - implementation("me.tongfei:progressbar:${versions["progressbar"]}") - implementation("com.github.ajalt.clikt:clikt:${versions["clikt"]}") - - implementation("org.apache.parquet:parquet-avro:${versions["parquet-avro"]}") - implementation("org.apache.hadoop:hadoop-client:${versions["hadoop-client"]}") { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - exclude(group = "log4j") - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/CompositeWorkloadPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/CompositeWorkloadPortfolio.kt deleted file mode 100644 index f4242456..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/CompositeWorkloadPortfolio.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import org.opendc.experiments.sc20.experiment.model.CompositeWorkload -import org.opendc.experiments.sc20.experiment.model.OperationalPhenomena -import org.opendc.experiments.sc20.experiment.model.Topology -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that explores the effect of a composite workload. - */ -public class CompositeWorkloadPortfolio : Portfolio("composite-workload") { - private val totalSampleLoad = 1.3301733005049648E12 - - override val topology: Topology by anyOf( - Topology("base"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-ver-hom"), - Topology("exp-vel-ver-hom") - ) - - override val workload: Workload by anyOf( - CompositeWorkload( - "all-azure", - listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)), - totalSampleLoad - ), - CompositeWorkload( - "solvinity-25-azure-75", - listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)), - totalSampleLoad - ), - CompositeWorkload( - "solvinity-50-azure-50", - listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)), - totalSampleLoad - ), - CompositeWorkload( - "solvinity-75-azure-25", - listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)), - totalSampleLoad - ), - CompositeWorkload( - "all-solvinity", - listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)), - totalSampleLoad - ) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false) - ) - - override val allocationPolicy: String by anyOf( - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt deleted file mode 100644 index 1e01e892..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.takeWhile -import kotlinx.coroutines.launch -import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.metal.NODE_CLUSTER -import org.opendc.compute.core.metal.driver.BareMetalDriver -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.compute.core.virt.service.VirtProvisioningEvent -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimBareMetalDriver -import org.opendc.compute.simulator.SimVirtDriver -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.AllocationPolicy -import org.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor -import org.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.failures.CorrelatedFaultInjector -import org.opendc.simulator.failures.FailureDomain -import org.opendc.simulator.failures.FaultInjector -import org.opendc.trace.core.EventTracer -import java.io.File -import java.time.Clock -import kotlin.math.ln -import kotlin.math.max -import kotlin.random.Random - -/** - * The logger for this experiment. - */ -private val logger = KotlinLogging.logger {} - -/** - * Construct the failure domain for the experiments. - */ -public suspend fun createFailureDomain( - coroutineScope: CoroutineScope, - clock: Clock, - seed: Int, - failureInterval: Double, - bareMetalProvisioner: ProvisioningService, - chan: Channel -): CoroutineScope { - val job = coroutineScope.launch { - chan.receive() - val random = Random(seed) - val injectors = mutableMapOf() - for (node in bareMetalProvisioner.nodes()) { - val cluster = node.metadata[NODE_CLUSTER] as String - val injector = - injectors.getOrPut(cluster) { - createFaultInjector( - this, - clock, - random, - failureInterval - ) - } - injector.enqueue(node.metadata["driver"] as FailureDomain) - } - } - return CoroutineScope(coroutineScope.coroutineContext + job) -} - -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -public fun createFaultInjector( - coroutineScope: CoroutineScope, - clock: Clock, - random: Random, - failureInterval: Double -): FaultInjector { - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return CorrelatedFaultInjector( - coroutineScope, - clock, - iatScale = ln(failureInterval), iatShape = 1.03, // Hours - sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1 - dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes - random = random - ) -} - -/** - * Create the trace reader from which the VM workloads are read. - */ -public fun createTraceReader( - path: File, - performanceInterferenceModel: PerformanceInterferenceModel, - vms: List, - seed: Int -): Sc20StreamingParquetTraceReader { - return Sc20StreamingParquetTraceReader( - path, - performanceInterferenceModel, - vms, - Random(seed) - ) -} - -/** - * Construct the environment for a VM provisioner and return the provisioner instance. - */ -public suspend fun createProvisioner( - coroutineScope: CoroutineScope, - clock: Clock, - environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy, - eventTracer: EventTracer -): Pair { - val environment = environmentReader.use { it.construct(coroutineScope, clock) } - val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] - - // Wait for the bare metal nodes to be spawned - delay(10) - - val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer, SimFairShareHypervisorProvider()) - - // Wait for the hypervisors to be spawned - delay(10) - - return bareMetalProvisioner to scheduler -} - -/** - * Attach the specified monitor to the VM provisioner. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public suspend fun attachMonitor( - coroutineScope: CoroutineScope, - clock: Clock, - scheduler: SimVirtProvisioningService, - monitor: ExperimentMonitor -) { - - val hypervisors = scheduler.drivers() - - // Monitor hypervisor events - for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - val server = (hypervisor as SimVirtDriver).server - monitor.reportHostStateChange(clock.millis(), hypervisor, server) - server.events - .onEach { event -> - val time = clock.millis() - when (event) { - is ServerEvent.StateChanged -> { - monitor.reportHostStateChange(time, hypervisor, event.server) - } - } - } - .launchIn(coroutineScope) - hypervisor.events - .onEach { event -> - when (event) { - is HypervisorEvent.SliceFinished -> monitor.reportHostSlice( - clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.hostServer - ) - } - } - .launchIn(coroutineScope) - - val driver = hypervisor.server.services[BareMetalDriver.Key] as SimBareMetalDriver - driver.powerDraw - .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } - .launchIn(coroutineScope) - } - - scheduler.events - .onEach { event -> - when (event) { - is VirtProvisioningEvent.MetricsAvailable -> - monitor.reportProvisionerMetrics(clock.millis(), event) - } - } - .launchIn(coroutineScope) -} - -/** - * Process the trace. - */ -public suspend fun processTrace( - coroutineScope: CoroutineScope, - clock: Clock, - reader: TraceReader, - scheduler: SimVirtProvisioningService, - chan: Channel, - monitor: ExperimentMonitor -) { - try { - var submitted = 0 - - while (reader.hasNext()) { - val (time, workload) = reader.next() - - submitted++ - delay(max(0, time - clock.millis())) - coroutineScope.launch { - chan.send(Unit) - val server = scheduler.deploy( - workload.image.name, - workload.image, - Flavor( - workload.image.tags["cores"] as Int, - workload.image.tags["required-memory"] as Long - ) - ) - // Monitor server events - server.events - .onEach { - if (it is ServerEvent.StateChanged) { - monitor.reportVmStateChange(clock.millis(), it.server) - } - } - .collect() - } - } - - scheduler.events - .takeWhile { - when (it) { - is VirtProvisioningEvent.MetricsAvailable -> - it.inactiveVmCount + it.failedVmCount != submitted - } - } - .collect() - delay(1) - } finally { - reader.close() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/HorVerPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/HorVerPortfolio.kt deleted file mode 100644 index aa97b808..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/HorVerPortfolio.kt +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import org.opendc.experiments.sc20.experiment.model.OperationalPhenomena -import org.opendc.experiments.sc20.experiment.model.Topology -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that explores the difference between horizontal and vertical scaling. - */ -public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") { - override val topology: Topology by anyOf( - Topology("base"), - Topology("rep-vol-hor-hom"), - Topology("rep-vol-hor-het"), - Topology("rep-vol-ver-hom"), - Topology("rep-vol-ver-het"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-hor-het"), - Topology("exp-vol-ver-hom"), - Topology("exp-vol-ver-het") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) - ) - - override val allocationPolicy: String by anyOf( - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreHpcPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreHpcPortfolio.kt deleted file mode 100644 index bdb33f59..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreHpcPortfolio.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import org.opendc.experiments.sc20.experiment.model.* -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] to explore the effect of HPC workloads. - */ -public class MoreHpcPortfolio : Portfolio("more_hpc") { - override val topology: Topology by anyOf( - Topology("base"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-ver-hom"), - Topology("exp-vel-ver-hom") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD), - Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD), - Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) - ) - - override val allocationPolicy: String by anyOf( - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreVelocityPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreVelocityPortfolio.kt deleted file mode 100644 index 733dabf6..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreVelocityPortfolio.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import org.opendc.experiments.sc20.experiment.model.OperationalPhenomena -import org.opendc.experiments.sc20.experiment.model.Topology -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that explores the effect of adding more velocity to a cluster (e.g., faster machines). - */ -public class MoreVelocityPortfolio : Portfolio("more_velocity") { - override val topology: Topology by anyOf( - Topology("base"), - Topology("rep-vel-ver-hom"), - Topology("rep-vel-ver-het"), - Topology("exp-vel-ver-hom"), - Topology("exp-vel-ver-het") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) - ) - - override val allocationPolicy: String by anyOf( - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/OperationalPhenomenaPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/OperationalPhenomenaPortfolio.kt deleted file mode 100644 index 66b94faf..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/OperationalPhenomenaPortfolio.kt +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import org.opendc.experiments.sc20.experiment.model.OperationalPhenomena -import org.opendc.experiments.sc20.experiment.model.Topology -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that explores the effect of operational phenomena on metrics. - */ -public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena") { - override val topology: Topology by anyOf( - Topology("base") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), - OperationalPhenomena(failureFrequency = 0.0, hasInterference = true), - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false), - OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) - ) - - override val allocationPolicy: String by anyOf( - "mem", - "mem-inv", - "core-mem", - "core-mem-inv", - "active-servers", - "active-servers-inv", - "random" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Portfolio.kt deleted file mode 100644 index 4a82ad56..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Portfolio.kt +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import mu.KotlinLogging -import org.opendc.compute.simulator.allocation.* -import org.opendc.experiments.sc20.experiment.model.CompositeWorkload -import org.opendc.experiments.sc20.experiment.model.OperationalPhenomena -import org.opendc.experiments.sc20.experiment.model.Topology -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor -import org.opendc.experiments.sc20.trace.Sc20ParquetTraceReader -import org.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader -import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import org.opendc.format.trace.PerformanceInterferenceModelReader -import org.opendc.harness.dsl.Experiment -import org.opendc.harness.dsl.anyOf -import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer -import java.io.File -import java.util.concurrent.ConcurrentHashMap -import kotlin.random.Random - -/** - * A portfolio represents a collection of scenarios are tested for the work. - * - * @param name The name of the portfolio. - */ -public abstract class Portfolio(name: String) : Experiment(name) { - /** - * The logger for this portfolio instance. - */ - private val logger = KotlinLogging.logger {} - - /** - * The path to where the environments are located. - */ - private val environmentPath by anyOf(File("environments/")) - - /** - * The path to where the traces are located. - */ - private val tracePath by anyOf(File("traces/")) - - /** - * The path to where the output results should be written. - */ - private val outputPath by anyOf(File("results/")) - - /** - * The path to the original VM placements file. - */ - private val vmPlacements by anyOf(emptyMap()) - - /** - * The path to the performance interference model. - */ - private val performanceInterferenceModel by anyOf(null) - - /** - * The topology to test. - */ - public abstract val topology: Topology - - /** - * The workload to test. - */ - public abstract val workload: Workload - - /** - * The operational phenomenas to consider. - */ - public abstract val operationalPhenomena: OperationalPhenomena - - /** - * The allocation policies to consider. - */ - public abstract val allocationPolicy: String - - /** - * A map of trace readers. - */ - private val traceReaders = ConcurrentHashMap() - - /** - * Perform a single trial for this portfolio. - */ - @OptIn(ExperimentalCoroutinesApi::class) - override fun doRun(repeat: Int) { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val tracer = EventTracer(clock) - val seeder = Random(repeat) - val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt")) - - val chan = Channel(Channel.CONFLATED) - val allocationPolicy = createAllocationPolicy(seeder) - - val workload = workload - val workloadNames = if (workload is CompositeWorkload) { - workload.workloads.map { it.name } - } else { - listOf(workload.name) - } - - val rawReaders = workloadNames.map { workloadName -> - traceReaders.computeIfAbsent(workloadName) { - logger.info { "Loading trace $workloadName" } - Sc20RawParquetTraceReader(File(tracePath, workloadName)) - } - } - - val performanceInterferenceModel = performanceInterferenceModel - ?.takeIf { operationalPhenomena.hasInterference } - ?.construct(seeder) ?: emptyMap() - val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt()) - - val monitor = ParquetExperimentMonitor( - outputPath, - "portfolio_id=$name/scenario_id=$id/run_id=$repeat", - 4096 - ) - - testScope.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner( - this, - clock, - environment, - allocationPolicy, - tracer - ) - - val failureDomain = if (operationalPhenomena.failureFrequency > 0) { - logger.debug("ENABLING failures") - createFailureDomain( - this, - clock, - seeder.nextInt(), - operationalPhenomena.failureFrequency, - bareMetalProvisioner, - chan - ) - } else { - null - } - - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - trace, - scheduler, - chan, - monitor - ) - - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - } - - try { - testScope.advanceUntilIdle() - } finally { - monitor.close() - } - } - - /** - * Create the [AllocationPolicy] instance to use for the trial. - */ - private fun createAllocationPolicy(seeder: Random): AllocationPolicy { - return when (allocationPolicy) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - "replay" -> ReplayAllocationPolicy(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") - } - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ReplayPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ReplayPortfolio.kt deleted file mode 100644 index 8a42a3b4..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ReplayPortfolio.kt +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import org.opendc.experiments.sc20.experiment.model.OperationalPhenomena -import org.opendc.experiments.sc20.experiment.model.Topology -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that compares the original VM placements against our policies. - */ -public class ReplayPortfolio : Portfolio("replay") { - override val topology: Topology by anyOf( - Topology("base") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) - ) - - override val allocationPolicy: String by anyOf( - "replay", - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/TestPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/TestPortfolio.kt deleted file mode 100644 index 2210fc97..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/TestPortfolio.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import org.opendc.experiments.sc20.experiment.model.OperationalPhenomena -import org.opendc.experiments.sc20.experiment.model.Topology -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] to perform a simple test run. - */ -public class TestPortfolio : Portfolio("test") { - override val topology: Topology by anyOf( - Topology("base") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) - ) - - override val allocationPolicy: String by anyOf("active-servers") -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt deleted file mode 100644 index b22f4c9e..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment.model - -/** - * Operation phenomena during experiments. - * - * @param failureFrequency The average time between failures in hours. - * @param hasInterference A flag to enable performance interference between VMs. - */ -public data class OperationalPhenomena(val failureFrequency: Double, val hasInterference: Boolean) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Topology.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Topology.kt deleted file mode 100644 index 95062fda..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Topology.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment.model - -/** - * The topology topology on which we test the workload. - */ -public data class Topology(val name: String) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Workload.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Workload.kt deleted file mode 100644 index ba95f544..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Workload.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment.model - -public enum class SamplingStrategy { - REGULAR, - HPC, - HPC_LOAD -} - -/** - * A workload that is considered for a scenario. - */ -public open class Workload( - public open val name: String, - public val fraction: Double, - public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR -) - -/** - * A workload that is composed of multiple workloads. - */ -public class CompositeWorkload(override val name: String, public val workloads: List, public val totalLoad: Double) : - Workload(name, -1.0) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt deleted file mode 100644 index 18ba2c33..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment.monitor - -import org.opendc.compute.core.Server -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.compute.core.virt.service.VirtProvisioningEvent -import java.io.Closeable - -/** - * A monitor watches the events of an experiment. - */ -public interface ExperimentMonitor : Closeable { - /** - * This method is invoked when the state of a VM changes. - */ - public fun reportVmStateChange(time: Long, server: Server) {} - - /** - * This method is invoked when the state of a host changes. - */ - public fun reportHostStateChange( - time: Long, - driver: VirtDriver, - server: Server - ) { - } - - /** - * Report the power consumption of a host. - */ - public fun reportPowerConsumption(host: Server, draw: Double) {} - - /** - * This method is invoked for a host for each slice that is finishes. - */ - public fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - duration: Long = 5 * 60 * 1000L - ) { - } - - /** - * This method is invoked for a provisioner event. - */ - public fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {} -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt deleted file mode 100644 index 3eb9362c..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment.monitor - -import mu.KotlinLogging -import org.opendc.compute.core.Server -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.compute.core.virt.service.VirtProvisioningEvent -import org.opendc.experiments.sc20.telemetry.HostEvent -import org.opendc.experiments.sc20.telemetry.ProvisionerEvent -import org.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter -import org.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter -import java.io.File - -/** - * The logger instance to use. - */ -private val logger = KotlinLogging.logger {} - -/** - * An [ExperimentMonitor] that logs the events to a Parquet file. - */ -public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor { - private val hostWriter = ParquetHostEventWriter( - File(base, "host-metrics/$partition/data.parquet"), - bufferSize - ) - private val provisionerWriter = ParquetProvisionerEventWriter( - File(base, "provisioner-metrics/$partition/data.parquet"), - bufferSize - ) - private val currentHostEvent = mutableMapOf() - private var startTime = -1L - - override fun reportVmStateChange(time: Long, server: Server) { - if (startTime < 0) { - startTime = time - - // Update timestamp of initial event - currentHostEvent.replaceAll { _, v -> v.copy(timestamp = startTime) } - } - } - - override fun reportHostStateChange( - time: Long, - driver: VirtDriver, - server: Server - ) { - logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } - - val previousEvent = currentHostEvent[server] - - val roundedTime = previousEvent?.let { - val duration = time - it.timestamp - val k = 5 * 60 * 1000L // 5 min in ms - val rem = duration % k - - if (rem == 0L) { - time - } else { - it.timestamp + duration + k - rem - } - } ?: time - - reportHostSlice( - roundedTime, - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server - ) - } - - private val lastPowerConsumption = mutableMapOf() - - override fun reportPowerConsumption(host: Server, draw: Double) { - lastPowerConsumption[host] = draw - } - - override fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - duration: Long - ) { - val previousEvent = currentHostEvent[hostServer] - when { - previousEvent == null -> { - val event = HostEvent( - time, - 5 * 60 * 1000L, - hostServer, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount - ) - - currentHostEvent[hostServer] = event - } - previousEvent.timestamp == time -> { - val event = HostEvent( - time, - previousEvent.duration, - hostServer, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount - ) - - currentHostEvent[hostServer] = event - } - else -> { - hostWriter.write(previousEvent) - - val event = HostEvent( - time, - time - previousEvent.timestamp, - hostServer, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount - ) - - currentHostEvent[hostServer] = event - } - } - } - - override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { - provisionerWriter.write( - ProvisionerEvent( - time, - event.totalHostCount, - event.availableHostCount, - event.totalVmCount, - event.activeVmCount, - event.inactiveVmCount, - event.waitingVmCount, - event.failedVmCount - ) - ) - } - - override fun close() { - // Flush remaining events - for ((_, event) in currentHostEvent) { - hostWriter.write(event) - } - currentHostEvent.clear() - - hostWriter.close() - provisionerWriter.close() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/Event.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/Event.kt deleted file mode 100644 index 38a4a227..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/Event.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry - -/** - * An event that occurs within the system. - */ -public abstract class Event(public val name: String) { - /** - * The time of occurrence of this event. - */ - public abstract val timestamp: Long -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/HostEvent.kt deleted file mode 100644 index 042d204e..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/HostEvent.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry - -import org.opendc.compute.core.Server - -/** - * A periodic report of the host machine metrics. - */ -public data class HostEvent( - override val timestamp: Long, - public val duration: Long, - public val host: Server, - public val vmCount: Int, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, - public val cpuUsage: Double, - public val cpuDemand: Double, - public val powerDraw: Double, - public val cores: Int -) : Event("host-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt deleted file mode 100644 index d063bc54..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry - -/** - * A periodic report of the provisioner's metrics. - */ -public data class ProvisionerEvent( - override val timestamp: Long, - public val totalHostCount: Int, - public val availableHostCount: Int, - public val totalVmCount: Int, - public val activeVmCount: Int, - public val inactiveVmCount: Int, - public val waitingVmCount: Int, - public val failedVmCount: Int -) : Event("provisioner-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/RunEvent.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/RunEvent.kt deleted file mode 100644 index 4f4706f0..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/RunEvent.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry - -import org.opendc.experiments.sc20.experiment.Portfolio - -/** - * A periodic report of the host machine metrics. - */ -public data class RunEvent( - val portfolio: Portfolio, - val repeat: Int, - override val timestamp: Long -) : Event("run") diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/VmEvent.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/VmEvent.kt deleted file mode 100644 index c18069c9..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/VmEvent.kt +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry - -import org.opendc.compute.core.Server - -/** - * A periodic report of a virtual machine's metrics. - */ -public data class VmEvent( - override val timestamp: Long, - public val duration: Long, - public val vm: Server, - public val host: Server, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, - public val cpuUsage: Double, - public val cpuDemand: Double -) : Event("vm-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt deleted file mode 100644 index 79bc23db..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry.parquet - -import mu.KotlinLogging -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.experiments.sc20.telemetry.Event -import java.io.Closeable -import java.io.File -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import kotlin.concurrent.thread - -/** - * The logging instance to use. - */ -private val logger = KotlinLogging.logger {} - -/** - * A writer that writes events in Parquet format. - */ -public open class ParquetEventWriter( - private val path: File, - private val schema: Schema, - private val converter: (T, GenericData.Record) -> Unit, - private val bufferSize: Int = 4096 -) : Runnable, Closeable { - /** - * The writer to write the Parquet file. - */ - private val writer = AvroParquetWriter.builder(Path(path.absolutePath)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - /** - * The queue of commands to process. - */ - private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) - - /** - * The thread that is responsible for writing the Parquet records. - */ - private val writerThread = thread(start = false, name = "parquet-writer") { run() } - - /** - * Write the specified metrics to the database. - */ - public fun write(event: T) { - queue.put(Action.Write(event)) - } - - /** - * Signal the writer to stop. - */ - public override fun close() { - queue.put(Action.Stop) - writerThread.join() - } - - init { - writerThread.start() - } - - /** - * Start the writer thread. - */ - override fun run() { - try { - loop@ while (true) { - val action = queue.take() - when (action) { - is Action.Stop -> break@loop - is Action.Write<*> -> { - val record = GenericData.Record(schema) - @Suppress("UNCHECKED_CAST") - converter(action.event as T, record) - writer.write(record) - } - } - } - } catch (e: Throwable) { - logger.error("Writer failed", e) - } finally { - writer.close() - } - } - - public sealed class Action { - /** - * A poison pill that will stop the writer thread. - */ - public object Stop : Action() - - /** - * Write the specified metrics to the database. - */ - public data class Write(val event: T) : Action() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt deleted file mode 100644 index bd4eae37..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.sc20.telemetry.HostEvent -import java.io.File - -/** - * A Parquet event writer for [HostEvent]s. - */ -public class ParquetHostEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter(path, schema, convert, bufferSize) { - - override fun toString(): String = "host-writer" - - public companion object { - private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record -> - // record.put("portfolio_id", event.run.parent.parent.id) - // record.put("scenario_id", event.run.parent.id) - // record.put("run_id", event.run.id) - record.put("host_id", event.host.name) - record.put("state", event.host.state.name) - record.put("timestamp", event.timestamp) - record.put("duration", event.duration) - record.put("vm_count", event.vmCount) - record.put("requested_burst", event.requestedBurst) - record.put("granted_burst", event.grantedBurst) - record.put("overcommissioned_burst", event.overcommissionedBurst) - record.put("interfered_burst", event.interferedBurst) - record.put("cpu_usage", event.cpuUsage) - record.put("cpu_demand", event.cpuDemand) - record.put("power_draw", event.powerDraw * (1.0 / 12)) - record.put("cores", event.cores) - } - - private val schema: Schema = SchemaBuilder - .record("host_metrics") - .namespace("org.opendc.experiments.sc20") - .fields() - // .name("portfolio_id").type().intType().noDefault() - // .name("scenario_id").type().intType().noDefault() - // .name("run_id").type().intType().noDefault() - .name("timestamp").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("host_id").type().stringType().noDefault() - .name("state").type().stringType().noDefault() - .name("vm_count").type().intType().noDefault() - .name("requested_burst").type().longType().noDefault() - .name("granted_burst").type().longType().noDefault() - .name("overcommissioned_burst").type().longType().noDefault() - .name("interfered_burst").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .name("cores").type().intType().noDefault() - .endRecord() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt deleted file mode 100644 index 5d53a7bb..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.sc20.telemetry.ProvisionerEvent -import java.io.File - -/** - * A Parquet event writer for [ProvisionerEvent]s. - */ -public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter(path, schema, convert, bufferSize) { - - override fun toString(): String = "provisioner-writer" - - public companion object { - private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record -> - record.put("timestamp", event.timestamp) - record.put("host_total_count", event.totalHostCount) - record.put("host_available_count", event.availableHostCount) - record.put("vm_total_count", event.totalVmCount) - record.put("vm_active_count", event.activeVmCount) - record.put("vm_inactive_count", event.inactiveVmCount) - record.put("vm_waiting_count", event.waitingVmCount) - record.put("vm_failed_count", event.failedVmCount) - } - - private val schema: Schema = SchemaBuilder - .record("provisioner_metrics") - .namespace("org.opendc.experiments.sc20") - .fields() - .name("timestamp").type().longType().noDefault() - .name("host_total_count").type().intType().noDefault() - .name("host_available_count").type().intType().noDefault() - .name("vm_total_count").type().intType().noDefault() - .name("vm_active_count").type().intType().noDefault() - .name("vm_inactive_count").type().intType().noDefault() - .name("vm_waiting_count").type().intType().noDefault() - .name("vm_failed_count").type().intType().noDefault() - .endRecord() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt deleted file mode 100644 index b50a698c..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.sc20.telemetry.RunEvent -import java.io.File - -/** - * A Parquet event writer for [RunEvent]s. - */ -public class ParquetRunEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter(path, schema, convert, bufferSize) { - - override fun toString(): String = "run-writer" - - public companion object { - private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record -> - val portfolio = event.portfolio - record.put("portfolio_name", portfolio.name) - record.put("scenario_id", portfolio.id) - record.put("run_id", event.repeat) - record.put("topology", portfolio.topology.name) - record.put("workload_name", portfolio.workload.name) - record.put("workload_fraction", portfolio.workload.fraction) - record.put("workload_sampler", portfolio.workload.samplingStrategy) - record.put("allocation_policy", portfolio.allocationPolicy) - record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency) - record.put("interference", portfolio.operationalPhenomena.hasInterference) - record.put("seed", event.repeat) - } - - private val schema: Schema = SchemaBuilder - .record("runs") - .namespace("org.opendc.experiments.sc20") - .fields() - .name("portfolio_name").type().stringType().noDefault() - .name("scenario_id").type().intType().noDefault() - .name("run_id").type().intType().noDefault() - .name("topology").type().stringType().noDefault() - .name("workload_name").type().stringType().noDefault() - .name("workload_fraction").type().doubleType().noDefault() - .name("workload_sampler").type().stringType().noDefault() - .name("allocation_policy").type().stringType().noDefault() - .name("failure_frequency").type().doubleType().noDefault() - .name("interference").type().booleanType().noDefault() - .name("seed").type().intType().noDefault() - .endRecord() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt deleted file mode 100644 index d735ea4b..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.trace - -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage -import org.opendc.experiments.sc20.experiment.model.CompositeWorkload -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import java.util.TreeSet - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param reader The internal trace reader to use. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - * @param run The run to which this reader belongs. - */ -@OptIn(ExperimentalStdlibApi::class) -public class Sc20ParquetTraceReader( - rawReaders: List, - performanceInterferenceModel: Map, - workload: Workload, - seed: Int -) : TraceReader { - /** - * The iterator over the actual trace. - */ - private val iterator: Iterator> = - rawReaders - .map { it.read() } - .run { - if (workload is CompositeWorkload) { - this.zip(workload.workloads) - } else { - this.zip(listOf(workload)) - } - } - .map { sampleWorkload(it.first, workload, it.second, seed) } - .flatten() - .run { - // Apply performance interference model - if (performanceInterferenceModel.isEmpty()) - this - else { - map { entry -> - val image = entry.workload.image - val id = image.name - val relevantPerformanceInterferenceModelItems = - performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) - - val newImage = - SimWorkloadImage( - image.uid, - image.name, - image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - (image as SimWorkloadImage).workload - ) - val newWorkload = entry.workload.copy(image = newImage) - Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) - } - } - } - .iterator() - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt deleted file mode 100644 index 4a318df4..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.trace - -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage -import org.opendc.core.User -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload -import java.io.File -import java.util.UUID - -private val logger = KotlinLogging.logger {} - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param path The directory of the traces. - */ -@OptIn(ExperimentalStdlibApi::class) -public class Sc20RawParquetTraceReader(private val path: File) { - /** - * Read the fragments into memory. - */ - private fun parseFragments(path: File): Map> { - val reader = AvroParquetReader.builder(Path(path.absolutePath, "trace.parquet")) - .disableCompatibility() - .build() - - val fragments = mutableMapOf>() - - return try { - while (true) { - val record = reader.read() ?: break - - val id = record["id"].toString() - val tick = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long - - val fragment = SimTraceWorkload.Fragment( - duration, - cpuUsage, - cores - ) - - fragments.getOrPut(id) { mutableListOf() }.add(fragment) - } - - fragments - } finally { - reader.close() - } - } - - /** - * Read the metadata into a workload. - */ - private fun parseMeta(path: File, fragments: Map>): List { - val metaReader = AvroParquetReader.builder(Path(path.absolutePath, "meta.parquet")) - .disableCompatibility() - .build() - - var counter = 0 - val entries = mutableListOf() - - return try { - while (true) { - val record = metaReader.read() ?: break - - val id = record["id"].toString() - if (!fragments.containsKey(id)) { - continue - } - - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) - - val vmFragments = fragments.getValue(id).asSequence() - val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs - val vmWorkload = VmWorkload( - uid, - id, - UnnamedUser, - SimWorkloadImage( - uid, - id, - mapOf( - "submit-time" to submissionTime, - "end-time" to endTime, - "total-load" to totalLoad, - "cores" to maxCores, - "required-memory" to requiredMemory - ), - SimTraceWorkload(vmFragments) - ) - ) - entries.add(TraceEntryImpl(submissionTime, vmWorkload)) - } - - entries - } catch (e: Exception) { - e.printStackTrace() - throw e - } finally { - metaReader.close() - } - } - - /** - * The entries in the trace. - */ - private val entries: List - - init { - val fragments = parseFragments(path) - entries = parseMeta(path, fragments) - } - - /** - * Read the entries in the trace. - */ - public fun read(): List> = entries - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - internal data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt deleted file mode 100644 index ba22ae15..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.trace - -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.filter2.compat.FilterCompat -import org.apache.parquet.filter2.predicate.FilterApi -import org.apache.parquet.filter2.predicate.Statistics -import org.apache.parquet.filter2.predicate.UserDefinedPredicate -import org.apache.parquet.io.api.Binary -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage -import org.opendc.core.User -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.workload.SimTraceWorkload -import java.io.File -import java.io.Serializable -import java.util.SortedSet -import java.util.TreeSet -import java.util.UUID -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. - * - * @param traceFile The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - */ -@OptIn(ExperimentalStdlibApi::class) -public class Sc20StreamingParquetTraceReader( - traceFile: File, - performanceInterferenceModel: PerformanceInterferenceModel, - selectedVms: List, - random: Random -) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * The intermediate buffer to store the read records in. - */ - private val queue = ArrayBlockingQueue>(1024) - - /** - * An optional filter for filtering the selected VMs - */ - private val filter = - if (selectedVms.isEmpty()) - null - else - FilterCompat.get( - FilterApi.userDefined( - FilterApi.binaryColumn("id"), - SelectedVmFilter( - TreeSet(selectedVms) - ) - ) - ) - - /** - * A poisonous fragment. - */ - private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0)) - - /** - * The thread to read the records in. - */ - private val readerThread = thread(start = true, name = "sc20-reader") { - val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) - .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } - .build() - - try { - while (true) { - val record = reader.read() - - if (record == null) { - queue.put(poison) - break - } - - val id = record["id"].toString() - val tick = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long - - val fragment = SimTraceWorkload.Fragment( - duration, - cpuUsage, - cores - ) - - queue.put(id to fragment) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - reader.close() - } - } - - /** - * Fill the buffers with the VMs - */ - private fun pull(buffers: Map>>) { - if (!hasNext) { - return - } - - val fragments = mutableListOf>() - queue.drainTo(fragments) - - for ((id, fragment) in fragments) { - if (id == poison.first) { - hasNext = false - return - } - buffers[id]?.forEach { it.add(fragment) } - } - } - - /** - * A flag to indicate whether the reader has more entries. - */ - private var hasNext: Boolean = true - - /** - * Initialize the reader. - */ - init { - val takenIds = mutableSetOf() - val entries = mutableMapOf() - val buffers = mutableMapOf>>() - - val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) - .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } - .build() - - while (true) { - val record = metaReader.read() ?: break - val id = record["id"].toString() - entries[id] = record - } - - metaReader.close() - - val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms - - // Create the entry iterator - iterator = selection.asSequence() - .mapNotNull { entries[it] } - .mapIndexed { index, record -> - val id = record["id"].toString() - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) - - assert(uid !in takenIds) - takenIds += uid - - logger.info("Processing VM $id") - - val internalBuffer = mutableListOf() - val externalBuffer = mutableListOf() - buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) - val fragments = sequence { - var time = submissionTime - repeat@ while (true) { - if (externalBuffer.isEmpty()) { - if (hasNext) { - pull(buffers) - continue - } else { - break - } - } - - internalBuffer.addAll(externalBuffer) - externalBuffer.clear() - - for (fragment in internalBuffer) { - yield(fragment) - - time += fragment.duration - if (time >= endTime) { - break@repeat - } - } - - internalBuffer.clear() - } - - buffers.remove(id) - } - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), - Random(random.nextInt()) - ) - val vmWorkload = VmWorkload( - uid, - "VM Workload $id", - UnnamedUser, - SimWorkloadImage( - uid, - id, - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to maxCores, - "required-memory" to requiredMemory - ), - SimTraceWorkload(fragments), - ) - ) - - TraceEntryImpl( - submissionTime, - vmWorkload - ) - } - .sortedBy { it.submissionTime } - .toList() - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() { - readerThread.interrupt() - } - - private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { - override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) - - override fun canDrop(statistics: Statistics): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() - } - - override fun inverseCanDrop(statistics: Statistics): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() - } - } - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20TraceConverter.kt deleted file mode 100644 index 9a9598e7..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ /dev/null @@ -1,621 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.trace - -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.parameters.arguments.argument -import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.groupChoice -import com.github.ajalt.clikt.parameters.options.convert -import com.github.ajalt.clikt.parameters.options.default -import com.github.ajalt.clikt.parameters.options.defaultLazy -import com.github.ajalt.clikt.parameters.options.option -import com.github.ajalt.clikt.parameters.options.required -import com.github.ajalt.clikt.parameters.options.split -import com.github.ajalt.clikt.parameters.types.file -import com.github.ajalt.clikt.parameters.types.long -import me.tongfei.progressbar.ProgressBar -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.format.trace.sc20.Sc20VmPlacementReader -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.Random -import kotlin.math.max -import kotlin.math.min - -/** - * Represents the command for converting traces - */ -public class TraceConverterCli : CliktCommand(name = "trace-converter") { - /** - * The directory where the trace should be stored. - */ - private val outputPath by option("-O", "--output", help = "path to store the trace") - .file(canBeFile = false, mustExist = false) - .defaultLazy { File("output") } - - /** - * The directory where the input trace is located. - */ - private val inputPath by argument("input", help = "path to the input trace") - .file(canBeFile = false) - - /** - * The input type of the trace. - */ - private val type by option("-t", "--type", help = "input type of trace").groupChoice( - "solvinity" to SolvinityConversion(), - "bitbrains" to BitbrainsConversion(), - "azure" to AzureConversion() - ) - - override fun run() { - val metaSchema = SchemaBuilder - .record("meta") - .namespace("org.opendc.format.sc20") - .fields() - .name("id").type().stringType().noDefault() - .name("submissionTime").type().longType().noDefault() - .name("endTime").type().longType().noDefault() - .name("maxCores").type().intType().noDefault() - .name("requiredMemory").type().longType().noDefault() - .endRecord() - val schema = SchemaBuilder - .record("trace") - .namespace("org.opendc.format.sc20") - .fields() - .name("id").type().stringType().noDefault() - .name("time").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("cores").type().intType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("flops").type().longType().noDefault() - .endRecord() - - val metaParquet = File(outputPath, "meta.parquet") - val traceParquet = File(outputPath, "trace.parquet") - - if (metaParquet.exists()) { - metaParquet.delete() - } - if (traceParquet.exists()) { - traceParquet.delete() - } - - val metaWriter = AvroParquetWriter.builder(Path(metaParquet.toURI())) - .withSchema(metaSchema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - val writer = AvroParquetWriter.builder(Path(traceParquet.toURI())) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - try { - val type = type ?: throw IllegalArgumentException("Invalid trace conversion") - val allFragments = type.read(inputPath, metaSchema, metaWriter) - allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) - - for (fragment in allFragments) { - val record = GenericData.Record(schema) - record.put("id", fragment.id) - record.put("time", fragment.tick) - record.put("duration", fragment.duration) - record.put("cores", fragment.cores) - record.put("cpuUsage", fragment.usage) - record.put("flops", fragment.flops) - - writer.write(record) - } - } finally { - writer.close() - metaWriter.close() - } - } -} - -/** - * The supported trace conversions. - */ -public sealed class TraceConversion(name: String) : OptionGroup(name) { - /** - * Read the fragments of the trace. - */ - public abstract fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList -} - -public class SolvinityConversion : TraceConversion("Solvinity") { - private val clusters by option() - .split(",") - - private val vmPlacements by option("--vm-placements", help = "file containing the VM placements") - .file(canBeDir = false) - .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } } - .required() - - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList { - val clusters = clusters?.toSet() ?: emptySet() - val timestampCol = 0 - val cpuUsageCol = 1 - val coreCol = 12 - val provisionedMemoryCol = 20 - val traceInterval = 5 * 60 * 1000L - - // Identify start time of the entire trace - var minTimestamp = Long.MAX_VALUE - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach file@{ vmFile -> - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val vmId = vmFile.name - - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null || !clusters.contains(clusterName)) { - continue - } - - val values = line.split("\t") - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - - if (timestamp < minTimestamp) { - minTimestamp = timestamp - } - return@file - } - } - } - } - - println("Start of trace at $minTimestamp") - - val allFragments = mutableListOf() - - val begin = 15 * 24 * 60 * 60 * 1000L - val end = 45 * 24 * 60 * 60 * 1000L - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores: Int - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split("\t") - - vmId = vmFile.name - - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null || !clusters.contains(clusterName)) { - continue - } - - val timestamp = - (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp - if (begin > timestamp || timestamp > end) { - continue - } - - cores = values[coreCol].trim().toInt() - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - cores - ) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - } - - if (last != null) { - yield(last!!) - } - } - - var maxTime = Long.MIN_VALUE - flopsFragments.filter { it.tick in begin until end }.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) - } - - if (minTime in begin until end) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) - } - } - - return allFragments - } -} - -/** - * Conversion of the Bitbrains public trace. - */ -public class BitbrainsConversion : TraceConversion("Bitbrains") { - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList { - val timestampCol = 0 - val cpuUsageCol = 3 - val coreCol = 1 - val provisionedMemoryCol = 5 - val traceInterval = 5 * 60 * 1000L - - val allFragments = mutableListOf() - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores: Int - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .drop(1) - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(";\t") - - vmId = vmFile.name - - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - - cores = values[coreCol].trim().toInt() - val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB - requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - cores - ) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - } - - if (last != null) { - yield(last!!) - } - } - - var maxTime = Long.MIN_VALUE - flopsFragments.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) - } - - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) - } - - return allFragments - } -} - -/** - * Conversion of the Azure public VM trace. - */ -public class AzureConversion : TraceConversion("Azure") { - private val seed by option(help = "seed for trace sampling") - .long() - .default(0) - - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList { - val random = Random(seed) - val fraction = 0.01 - - // Read VM table - val vmIdTableCol = 0 - val coreTableCol = 9 - val provisionedMemoryTableCol = 10 - - var vmId: String - var cores: Int - var requiredMemory: Long - - val vmIds = mutableSetOf() - val vmIdToMetadata = mutableMapOf() - - BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader -> - reader.lineSequence() - .chunked(1024) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - // Sample only a fraction of the VMs - if (random.nextDouble() > fraction) { - continue - } - - val values = line.split(",") - - // Exclude VMs with a large number of cores (not specified exactly) - if (values[coreTableCol].contains(">")) { - continue - } - - vmId = values[vmIdTableCol].trim() - cores = values[coreTableCol].trim().toInt() - requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB - - vmIds.add(vmId) - vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L) - } - } - } - - // Read VM metric reading files - val timestampCol = 0 - val vmIdCol = 1 - val cpuUsageCol = 4 - val traceInterval = 5 * 60 * 1000L - - val vmIdToFragments = mutableMapOf>() - val vmIdToLastFragment = mutableMapOf() - val allFragments = mutableListOf() - - for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) { - val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") - var timestamp: Long - var cpuUsage: Double - - BufferedReader(FileReader(readingsFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(",") - vmId = values[vmIdCol].trim() - - // Ignore readings for VMs not in the sample - if (!vmIds.contains(vmId)) { - continue - } - - timestamp = values[timestampCol].trim().toLong() * 1000L - vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) - cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz - vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) - - val flops: Long = (cpuUsage * 5 * 60).toLong() - val lastFragment = vmIdToLastFragment[vmId] - - vmIdToLastFragment[vmId] = - if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { - Fragment( - vmId, - lastFragment.tick, - lastFragment.flops + flops, - lastFragment.duration + traceInterval, - cpuUsage, - vmIdToMetadata[vmId]!!.cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - vmIdToMetadata[vmId]!!.cores - ) - if (lastFragment != null) { - if (vmIdToFragments[vmId] == null) { - vmIdToFragments[vmId] = mutableListOf() - } - vmIdToFragments[vmId]!!.add(lastFragment) - allFragments.add(lastFragment) - } - fragment - } - } - } - } - } - - for (entry in vmIdToLastFragment) { - if (entry.value != null) { - if (vmIdToFragments[entry.key] == null) { - vmIdToFragments[entry.key] = mutableListOf() - } - vmIdToFragments[entry.key]!!.add(entry.value!!) - } - } - - println("Read ${vmIdToLastFragment.size} VMs") - - for (entry in vmIdToMetadata) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", entry.key) - metaRecord.put("submissionTime", entry.value.minTime) - metaRecord.put("endTime", entry.value.maxTime) - println("${entry.value.minTime} - ${entry.value.maxTime}") - metaRecord.put("maxCores", entry.value.cores) - metaRecord.put("requiredMemory", entry.value.requiredMemory) - metaWriter.write(metaRecord) - } - - return allFragments - } -} - -public data class Fragment( - public val id: String, - public val tick: Long, - public val flops: Long, - public val duration: Long, - public val usage: Double, - public val cores: Int -) - -public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long) - -/** - * A script to convert a trace in text format into a Parquet trace. - */ -public fun main(args: Array): Unit = TraceConverterCli().main(args) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt deleted file mode 100644 index a8b83aef..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.trace - -import mu.KotlinLogging -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage -import org.opendc.experiments.sc20.experiment.model.CompositeWorkload -import org.opendc.experiments.sc20.experiment.model.SamplingStrategy -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.format.trace.TraceEntry -import java.util.* -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * Sample the workload for the specified [run]. - */ -public fun sampleWorkload( - trace: List>, - workload: Workload, - subWorkload: Workload, - seed: Int -): List> { - return when { - workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) - workload.samplingStrategy == SamplingStrategy.HPC -> - sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false) - workload.samplingStrategy == SamplingStrategy.HPC_LOAD -> - sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true) - else -> - sampleRegularWorkload(trace, workload, workload, seed) - } -} - -/** - * Sample a regular (non-HPC) workload. - */ -public fun sampleRegularWorkload( - trace: List>, - workload: Workload, - subWorkload: Workload, - seed: Int -): List> { - val fraction = subWorkload.fraction - - val shuffled = trace.shuffled(Random(seed)) - val res = mutableListOf>() - val totalLoad = if (workload is CompositeWorkload) { - workload.totalLoad - } else { - shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } - } - var currentLoad = 0.0 - - for (entry in shuffled) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - currentLoad += entryLoad - res += entry - } - - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} - -/** - * Sample a HPC workload. - */ -public fun sampleHpcWorkload( - trace: List>, - workload: Workload, - seed: Int, - sampleOnLoad: Boolean -): List> { - val pattern = Regex("^vm__workload__(ComputeNode|cn).*") - val random = Random(seed) - - val fraction = workload.fraction - val (hpc, nonHpc) = trace.partition { entry -> - val name = entry.workload.image.name - name.matches(pattern) - } - - val hpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf>() - hpc.mapTo(res) { sample(it, index) } - res.shuffle(random) - res - } - .flatten() - - val nonHpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf>() - nonHpc.mapTo(res) { sample(it, index) } - res.shuffle(random) - res - } - .flatten() - - logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } - - val totalLoad = if (workload is CompositeWorkload) { - workload.totalLoad - } else { - trace.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } - } - - logger.debug { "Total trace load: $totalLoad" } - var hpcCount = 0 - var hpcLoad = 0.0 - var nonHpcCount = 0 - var nonHpcLoad = 0.0 - - val res = mutableListOf>() - - if (sampleOnLoad) { - var currentLoad = 0.0 - for (entry in hpcSequence) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - hpcLoad += entryLoad - hpcCount += 1 - currentLoad += entryLoad - res += entry - } - - for (entry in nonHpcSequence) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > 1) { - break - } - - nonHpcLoad += entryLoad - nonHpcCount += 1 - currentLoad += entryLoad - res += entry - } - } else { - hpcSequence - .take((fraction * trace.size).toInt()) - .forEach { entry -> - hpcLoad += entry.workload.image.tags.getValue("total-load") as Double - hpcCount += 1 - res.add(entry) - } - - nonHpcSequence - .take(((1 - fraction) * trace.size).toInt()) - .forEach { entry -> - nonHpcLoad += entry.workload.image.tags.getValue("total-load") as Double - nonHpcCount += 1 - res.add(entry) - } - } - - logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } - logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} - -/** - * Sample a random trace entry. - */ -private fun sample(entry: TraceEntry, i: Int): TraceEntry { - val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray()) - val image = SimWorkloadImage( - id, - entry.workload.image.name, - entry.workload.image.tags, - (entry.workload.image as SimWorkloadImage).workload - ) - val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name) - return VmTraceEntry(vmWorkload, entry.submissionTime) -} - -private class VmTraceEntry(override val workload: VmWorkload, override val submissionTime: Long) : - TraceEntry diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml deleted file mode 100644 index 8029092e..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml +++ /dev/null @@ -1,49 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt deleted file mode 100644 index c5ad345d..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20 - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.opendc.compute.core.Server -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy -import org.opendc.experiments.sc20.experiment.attachMonitor -import org.opendc.experiments.sc20.experiment.createFailureDomain -import org.opendc.experiments.sc20.experiment.createProvisioner -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor -import org.opendc.experiments.sc20.experiment.processTrace -import org.opendc.experiments.sc20.trace.Sc20ParquetTraceReader -import org.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer -import java.io.File -import java.time.Clock - -/** - * An integration test suite for the SC20 experiments. - */ -@OptIn(ExperimentalCoroutinesApi::class) -class Sc20IntegrationTest { - /** - * The [TestCoroutineScope] to use. - */ - private lateinit var testScope: TestCoroutineScope - - /** - * The simulation clock to use. - */ - private lateinit var clock: Clock - - /** - * The monitor used to keep track of the metrics. - */ - private lateinit var monitor: TestExperimentReporter - - /** - * Setup the experimental environment. - */ - @BeforeEach - fun setUp() { - testScope = TestCoroutineScope() - clock = DelayControllerClockAdapter(testScope) - - monitor = TestExperimentReporter() - } - - /** - * Tear down the experimental environment. - */ - @AfterEach - fun tearDown() = testScope.cleanupTestCoroutines() - - @Test - fun testLarge() { - val failures = false - val seed = 0 - val chan = Channel(Channel.CONFLATED) - val allocationPolicy = AvailableCoreMemoryAllocationPolicy() - val traceReader = createTestTraceReader() - val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: SimVirtProvisioningService - val tracer = EventTracer(clock) - - testScope.launch { - val res = createProvisioner( - this, - clock, - environmentReader, - allocationPolicy, - tracer - ) - val bareMetalProvisioner = res.first - scheduler = res.second - - val failureDomain = if (failures) { - println("ENABLING failures") - createFailureDomain( - this, - clock, - seed, - 24.0 * 7, - bareMetalProvisioner, - chan - ) - } else { - null - } - - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - traceReader, - scheduler, - chan, - monitor - ) - - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - monitor.close() - } - - runSimulation() - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1684849230562, monitor.totalRequestedBurst) }, - { assertEquals(447612683996, monitor.totalGrantedBurst) }, - { assertEquals(1219535757406, monitor.totalOvercommissionedBurst) }, - { assertEquals(0, monitor.totalInterferedBurst) } - ) - } - - @Test - fun testSmall() { - val seed = 1 - val chan = Channel(Channel.CONFLATED) - val allocationPolicy = AvailableCoreMemoryAllocationPolicy() - val traceReader = createTestTraceReader(0.5, seed) - val environmentReader = createTestEnvironmentReader("single") - lateinit var scheduler: SimVirtProvisioningService - val tracer = EventTracer(clock) - - testScope.launch { - val res = createProvisioner( - this, - clock, - environmentReader, - allocationPolicy, - tracer - ) - scheduler = res.second - - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - traceReader, - scheduler, - chan, - monitor - ) - - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - - scheduler.terminate() - monitor.close() - } - - runSimulation() - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(705128393966, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } - ) - } - - /** - * Run the simulation. - */ - private fun runSimulation() = testScope.advanceUntilIdle() - - /** - * Obtain the trace reader for the test. - */ - private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader { - return Sc20ParquetTraceReader( - listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), - emptyMap(), - Workload("test", fraction), - seed - ) - } - - /** - * Obtain the environment reader for the test. - */ - private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader { - val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt") - return Sc20ClusterEnvironmentReader(stream) - } - - class TestExperimentReporter : ExperimentMonitor { - var totalRequestedBurst = 0L - var totalGrantedBurst = 0L - var totalOvercommissionedBurst = 0L - var totalInterferedBurst = 0L - - override fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - duration: Long - ) { - totalRequestedBurst += requestedBurst - totalGrantedBurst += grantedBurst - totalOvercommissionedBurst += overcommissionedBurst - totalInterferedBurst += interferedBurst - } - - override fun close() {} - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/single.txt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/single.txt deleted file mode 100644 index 53b3c2d7..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/single.txt +++ /dev/null @@ -1,3 +0,0 @@ -ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost -A01;A01;8;3.2;64;1;64;8 - diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/topology.txt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/topology.txt deleted file mode 100644 index 6b347bff..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/topology.txt +++ /dev/null @@ -1,5 +0,0 @@ -ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost -A01;A01;32;3.2;2048;1;256;32 -B01;B01;48;2.93;1256;6;64;8 -C01;C01;32;3.2;2048;2;128;16 - diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/meta.parquet b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/meta.parquet deleted file mode 100644 index ce7a812c..00000000 Binary files a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/meta.parquet and /dev/null differ diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/trace.parquet b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/trace.parquet deleted file mode 100644 index 1d7ce882..00000000 Binary files a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/trace.parquet and /dev/null differ diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts index 85b24e19..a9c679f6 100644 --- a/simulator/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc-runner-web/build.gradle.kts @@ -36,7 +36,7 @@ dependencies { api(project(":opendc-core")) implementation(project(":opendc-compute:opendc-compute-simulator")) implementation(project(":opendc-format")) - implementation(project(":opendc-experiments:opendc-experiments-sc20")) + implementation(project(":opendc-experiments:opendc-experiments-capelin")) implementation(project(":opendc-simulator:opendc-simulator-core")) implementation(project(":opendc-simulator:opendc-simulator-compute")) diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 7796019a..533bf321 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -41,13 +41,13 @@ import mu.KotlinLogging import org.bson.Document import org.bson.types.ObjectId import org.opendc.compute.simulator.allocation.* -import org.opendc.experiments.sc20.experiment.attachMonitor -import org.opendc.experiments.sc20.experiment.createFailureDomain -import org.opendc.experiments.sc20.experiment.createProvisioner -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.experiments.sc20.experiment.processTrace -import org.opendc.experiments.sc20.trace.Sc20ParquetTraceReader -import org.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader +import org.opendc.experiments.capelin.experiment.attachMonitor +import org.opendc.experiments.capelin.experiment.createFailureDomain +import org.opendc.experiments.capelin.experiment.createProvisioner +import org.opendc.experiments.capelin.experiment.processTrace +import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader +import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt index 7ef25552..f16f9b90 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -27,8 +27,8 @@ import org.opendc.compute.core.Server import org.opendc.compute.core.ServerState import org.opendc.compute.core.virt.driver.VirtDriver import org.opendc.compute.core.virt.service.VirtProvisioningEvent -import org.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor -import org.opendc.experiments.sc20.telemetry.HostEvent +import org.opendc.experiments.capelin.monitor.ExperimentMonitor +import org.opendc.experiments.capelin.telemetry.HostEvent import kotlin.math.max /** -- cgit v1.2.3