summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-16 16:52:00 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-19 14:02:51 +0200
commitb14df2a0924774c5aed15cedeb1027abf8ee5361 (patch)
treed93171ad63477b97e160d3a10e3216c3de1b5ff5 /opendc-experiments
parent29e52ae17bd567070b52e0bcd3c254ee7491189a (diff)
refactor(capelin): Make workload sampling model extensible
This change updates the workload sampling implementation to be more flexible in the way the workload is constructed. Users can now sample multiple workloads at the same time using multiple samplers and use them as a single workload to simulate.
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt28
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt18
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt26
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt23
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt68
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt199
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt84
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet (renamed from opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet)bin2081 -> 2081 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet (renamed from opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet)bin1647189 -> 1647189 bytes
-rw-r--r--opendc-experiments/opendc-experiments-radice/build.gradle.kts47
15 files changed, 95 insertions, 434 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
index faabe5cb..31e8f961 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
@@ -22,7 +22,8 @@
package org.opendc.experiments.capelin
-import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.compute.workload.composite
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -42,30 +43,25 @@ public class CompositeWorkloadPortfolio : Portfolio("composite-workload") {
)
override val workload: Workload by anyOf(
- CompositeWorkload(
+ Workload(
"all-azure",
- listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.0, trace("azure") to 1.0)
),
- CompositeWorkload(
+ Workload(
"solvinity-25-azure-75",
- listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.25, trace("azure") to 0.75)
),
- CompositeWorkload(
+ Workload(
"solvinity-50-azure-50",
- listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.5, trace("azure") to 0.5)
),
- CompositeWorkload(
+ Workload(
"solvinity-75-azure-25",
- listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.75, trace("azure") to 0.25)
),
- CompositeWorkload(
+ Workload(
"all-solvinity",
- listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 1.0, trace("azure") to 0.0)
)
)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
index e1cf8517..cd093e6c 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -44,10 +46,10 @@ public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
index a995e467..73e59a58 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
@@ -22,8 +22,10 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByHpc
+import org.opendc.compute.workload.sampleByHpcLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
-import org.opendc.experiments.capelin.model.SamplingStrategy
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
import org.opendc.harness.dsl.anyOf
@@ -40,13 +42,13 @@ public class MoreHpcPortfolio : Portfolio("more_hpc") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD),
- Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD),
- Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD)
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.0)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(1.0)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
index 49559e0e..9d5717bb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -40,10 +42,10 @@ public class MoreVelocityPortfolio : Portfolio("more_velocity") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
index 1aac4f9e..7ab586b3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,10 +38,10 @@ public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena")
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 02811d83..630b76c4 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -24,18 +24,16 @@ package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.ComputeWorkloadRunner
import org.opendc.compute.workload.export.parquet.ParquetExportMonitor
import org.opendc.compute.workload.grid5000
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.trace.RawParquetTraceReader
import org.opendc.compute.workload.util.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.topology.clusterTopology
-import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
@@ -47,7 +45,6 @@ import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Duration
import java.util.*
-import java.util.concurrent.ConcurrentHashMap
import kotlin.math.roundToLong
/**
@@ -92,9 +89,9 @@ abstract class Portfolio(name: String) : Experiment(name) {
abstract val allocationPolicy: String
/**
- * A map of trace readers.
+ * A helper class to load workload traces.
*/
- private val traceReaders = ConcurrentHashMap<String, RawParquetTraceReader>()
+ private val workloadLoader = ComputeWorkloadLoader(File(config.getString("trace-path")))
/**
* Perform a single trial for this portfolio.
@@ -102,19 +99,6 @@ abstract class Portfolio(name: String) : Experiment(name) {
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
- val workload = workload
- val workloadNames = if (workload is CompositeWorkload) {
- workload.workloads.map { it.name }
- } else {
- listOf(workload.name)
- }
- val rawReaders = workloadNames.map { workloadName ->
- traceReaders.computeIfAbsent(workloadName) {
- logger.info { "Loading trace $workloadName" }
- RawParquetTraceReader(File(config.getString("trace-path"), workloadName))
- }
- }
- val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
PerformanceInterferenceReader()
.read(File(config.getString("interference-model")))
@@ -125,7 +109,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements)
val failureModel =
if (operationalPhenomena.failureFrequency > 0)
- grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt())
+ grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()))
else
null
val runner = ComputeWorkloadRunner(
@@ -149,7 +133,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
runner.apply(topology)
// Run the workload trace
- runner.run(trace)
+ runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong())
} finally {
runner.close()
metricReader.close()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
index b6d3b30c..17ec48d4 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,7 +37,7 @@ public class ReplayPortfolio : Portfolio("replay") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity"))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
index 90840db8..98eb989d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,7 +37,7 @@ public class TestPortfolio : Portfolio("test") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity"))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
index c4ddd158..a2e71243 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
@@ -22,23 +22,12 @@
package org.opendc.experiments.capelin.model
-public enum class SamplingStrategy {
- REGULAR,
- HPC,
- HPC_LOAD
-}
+import org.opendc.compute.workload.ComputeWorkload
/**
- * A workload that is considered for a scenario.
- */
-public open class Workload(
- public open val name: String,
- public val fraction: Double,
- public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR
-)
-
-/**
- * A workload that is composed of multiple workloads.
+ * A single workload originating from a trace.
+ *
+ * @param name the name of the workload.
+ * @param source The source of the workload data.
*/
-public class CompositeWorkload(override val name: String, public val workloads: List<Workload>, public val totalLoad: Double) :
- Workload(name, -1.0)
+data class Workload(val name: String, val source: ComputeWorkload)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
deleted file mode 100644
index 498636ba..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import org.opendc.compute.workload.trace.RawParquetTraceReader
-import org.opendc.compute.workload.trace.TraceEntry
-import org.opendc.compute.workload.trace.TraceReader
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.simulator.compute.workload.SimWorkload
-
-/**
- * A [TraceReader] for the internal VM workload trace format.
- *
- * @param rawReaders The internal raw trace readers to use.
- * @param workload The workload to read.
- * @param seed The seed to use for sampling.
- */
-public class ParquetTraceReader(
- rawReaders: List<RawParquetTraceReader>,
- workload: Workload,
- seed: Int
-) : TraceReader<SimWorkload> {
- /**
- * The iterator over the actual trace.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>> =
- rawReaders
- .map { it.read() }
- .run {
- if (workload is CompositeWorkload) {
- this.zip(workload.workloads)
- } else {
- this.zip(listOf(workload))
- }
- }
- .flatMap {
- sampleWorkload(it.first, workload, it.second, seed)
- .sortedBy(TraceEntry<SimWorkload>::start)
- }
- .iterator()
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
deleted file mode 100644
index b42951df..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import mu.KotlinLogging
-import org.opendc.compute.workload.trace.TraceEntry
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.SamplingStrategy
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.util.*
-import kotlin.random.Random
-
-private val logger = KotlinLogging.logger {}
-
-/**
- * Sample the workload for the specified [run].
- */
-public fun sampleWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- subWorkload: Workload,
- seed: Int
-): List<TraceEntry<SimWorkload>> {
- return when {
- workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed)
- workload.samplingStrategy == SamplingStrategy.HPC ->
- sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false)
- workload.samplingStrategy == SamplingStrategy.HPC_LOAD ->
- sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true)
- else ->
- sampleRegularWorkload(trace, workload, workload, seed)
- }
-}
-
-/**
- * Sample a regular (non-HPC) workload.
- */
-public fun sampleRegularWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- subWorkload: Workload,
- seed: Int
-): List<TraceEntry<SimWorkload>> {
- val fraction = subWorkload.fraction
-
- val shuffled = trace.shuffled(Random(seed))
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- val totalLoad = if (workload is CompositeWorkload) {
- workload.totalLoad
- } else {
- shuffled.sumOf { it.meta.getValue("total-load") as Double }
- }
- var currentLoad = 0.0
-
- for (entry in shuffled) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- currentLoad += entryLoad
- res += entry
- }
-
- logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
-}
-
-/**
- * Sample a HPC workload.
- */
-public fun sampleHpcWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- seed: Int,
- sampleOnLoad: Boolean
-): List<TraceEntry<SimWorkload>> {
- val pattern = Regex("^vm__workload__(ComputeNode|cn).*")
- val random = Random(seed)
-
- val fraction = workload.fraction
- val (hpc, nonHpc) = trace.partition { entry ->
- val name = entry.name
- name.matches(pattern)
- }
-
- val hpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- hpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- val nonHpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- nonHpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
-
- val totalLoad = if (workload is CompositeWorkload) {
- workload.totalLoad
- } else {
- trace.sumOf { it.meta.getValue("total-load") as Double }
- }
-
- logger.debug { "Total trace load: $totalLoad" }
- var hpcCount = 0
- var hpcLoad = 0.0
- var nonHpcCount = 0
- var nonHpcLoad = 0.0
-
- val res = mutableListOf<TraceEntry<SimWorkload>>()
-
- if (sampleOnLoad) {
- var currentLoad = 0.0
- for (entry in hpcSequence) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- hpcLoad += entryLoad
- hpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
-
- for (entry in nonHpcSequence) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > 1) {
- break
- }
-
- nonHpcLoad += entryLoad
- nonHpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
- } else {
- hpcSequence
- .take((fraction * trace.size).toInt())
- .forEach { entry ->
- hpcLoad += entry.meta.getValue("total-load") as Double
- hpcCount += 1
- res.add(entry)
- }
-
- nonHpcSequence
- .take(((1 - fraction) * trace.size).toInt())
- .forEach { entry ->
- nonHpcLoad += entry.meta.getValue("total-load") as Double
- nonHpcCount += 1
- res.add(entry)
- }
- }
-
- logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
- logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
- logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
-}
-
-/**
- * Sample a random trace entry.
- */
-private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> {
- val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
- return entry.copy(uid = uid)
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index c1386bfe..140a84db 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -31,18 +31,12 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.workload.ComputeWorkloadRunner
-import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.*
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.trace.RawParquetTraceReader
-import org.opendc.compute.workload.trace.TraceReader
import org.opendc.compute.workload.util.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.topology.clusterTopology
-import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
@@ -68,6 +62,11 @@ class CapelinIntegrationTest {
private lateinit var computeScheduler: FilterScheduler
/**
+ * The [ComputeWorkloadLoader] responsible for loading the traces.
+ */
+ private lateinit var workloadLoader: ComputeWorkloadLoader
+
+ /**
* Setup the experimental environment.
*/
@BeforeEach
@@ -77,6 +76,7 @@ class CapelinIntegrationTest {
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
)
+ workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace"))
}
/**
@@ -84,24 +84,24 @@ class CapelinIntegrationTest {
*/
@Test
fun testLarge() = runBlockingSimulation {
- val traceReader = createTestTraceReader()
- val simulator = ComputeWorkloadRunner(
+ val workload = createTestWorkload(1.0)
+ val runner = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler
)
val topology = createTopology()
- val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+ val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor))
try {
- simulator.apply(topology)
- simulator.run(traceReader)
+ runner.apply(topology)
+ runner.run(workload, 0)
} finally {
- simulator.close()
+ runner.close()
metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
+ val serviceMetrics = collectServiceMetrics(clock.instant(), runner.producers[0])
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -117,11 +117,11 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } },
- { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } },
- { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } },
+ { assertEquals(221949826, monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(68421374, monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(947010, monitor.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } },
+ { assertEquals(5.783711298639437E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
@@ -131,7 +131,7 @@ class CapelinIntegrationTest {
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val traceReader = createTestTraceReader(0.25, seed)
+ val workload = createTestWorkload(0.25, seed)
val simulator = ComputeWorkloadRunner(
coroutineContext,
@@ -143,7 +143,7 @@ class CapelinIntegrationTest {
try {
simulator.apply(topology)
- simulator.run(traceReader)
+ simulator.run(workload, seed.toLong())
} finally {
simulator.close()
metricReader.close()
@@ -161,9 +161,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(8545158, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(12195642, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(941038, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -173,9 +173,8 @@ class CapelinIntegrationTest {
*/
@Test
fun testInterference() = runBlockingSimulation {
- val seed = 1
- val traceReader = createTestTraceReader(0.25, seed)
-
+ val seed = 0
+ val workload = createTestWorkload(1.0, seed)
val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
val performanceInterferenceModel =
PerformanceInterferenceReader()
@@ -193,7 +192,7 @@ class CapelinIntegrationTest {
try {
simulator.apply(topology)
- simulator.run(traceReader)
+ simulator.run(workload, seed.toLong())
} finally {
simulator.close()
metricReader.close()
@@ -211,10 +210,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
- { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } }
+ { assertEquals(8545158, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(12195642, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(941038, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(3378, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -228,15 +227,15 @@ class CapelinIntegrationTest {
coroutineContext,
clock,
computeScheduler,
- grid5000(Duration.ofDays(7), seed)
+ grid5000(Duration.ofDays(7))
)
val topology = createTopology("single")
- val traceReader = createTestTraceReader(0.25, seed)
+ val workload = createTestWorkload(0.25, seed)
val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
try {
simulator.apply(topology)
- simulator.run(traceReader)
+ simulator.run(workload, seed.toLong())
} finally {
simulator.close()
metricReader.close()
@@ -254,23 +253,20 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(8640140, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(12100660, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(939456, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
- { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } }
+ { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } }
)
}
/**
* Obtain the trace reader for the test.
*/
- private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
- return ParquetTraceReader(
- listOf(RawParquetTraceReader(File("src/test/resources/trace"))),
- Workload("test", fraction),
- seed
- )
+ private fun createTestWorkload(fraction: Double, seed: Int = 0): List<VirtualMachine> {
+ val source = trace("bitbrains-small").sampleByLoad(fraction)
+ return source.resolve(workloadLoader, Random(seed.toLong()))
}
/**
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
index ee76d38f..ee76d38f 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
index 9b1cde13..9b1cde13 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-radice/build.gradle.kts b/opendc-experiments/opendc-experiments-radice/build.gradle.kts
deleted file mode 100644
index 0c716183..00000000
--- a/opendc-experiments/opendc-experiments-radice/build.gradle.kts
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-description = "Experiments for the Risk Analysis work"
-
-/* Build configuration */
-plugins {
- `experiment-conventions`
- `testing-conventions`
-}
-
-dependencies {
- api(platform(projects.opendcPlatform))
- api(projects.opendcHarness.opendcHarnessApi)
- implementation(projects.opendcFormat)
- implementation(projects.opendcSimulator.opendcSimulatorCore)
- implementation(projects.opendcSimulator.opendcSimulatorCompute)
- implementation(projects.opendcCompute.opendcComputeSimulator)
- implementation(projects.opendcTelemetry.opendcTelemetrySdk)
-
- implementation(libs.kotlin.logging)
- implementation(libs.config)
- implementation(libs.progressbar)
- implementation(libs.clikt)
-
- implementation(libs.parquet)
- testImplementation(libs.log4j.slf4j)
-}