summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt28
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt18
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt26
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt23
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt68
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt199
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt84
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet (renamed from opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet)bin2081 -> 2081 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet (renamed from opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet)bin1647189 -> 1647189 bytes
-rw-r--r--opendc-experiments/opendc-experiments-radice/build.gradle.kts47
15 files changed, 95 insertions, 434 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
index faabe5cb..31e8f961 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
@@ -22,7 +22,8 @@
package org.opendc.experiments.capelin
-import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.compute.workload.composite
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -42,30 +43,25 @@ public class CompositeWorkloadPortfolio : Portfolio("composite-workload") {
)
override val workload: Workload by anyOf(
- CompositeWorkload(
+ Workload(
"all-azure",
- listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.0, trace("azure") to 1.0)
),
- CompositeWorkload(
+ Workload(
"solvinity-25-azure-75",
- listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.25, trace("azure") to 0.75)
),
- CompositeWorkload(
+ Workload(
"solvinity-50-azure-50",
- listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.5, trace("azure") to 0.5)
),
- CompositeWorkload(
+ Workload(
"solvinity-75-azure-25",
- listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.75, trace("azure") to 0.25)
),
- CompositeWorkload(
+ Workload(
"all-solvinity",
- listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 1.0, trace("azure") to 0.0)
)
)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
index e1cf8517..cd093e6c 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -44,10 +46,10 @@ public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
index a995e467..73e59a58 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
@@ -22,8 +22,10 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByHpc
+import org.opendc.compute.workload.sampleByHpcLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
-import org.opendc.experiments.capelin.model.SamplingStrategy
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
import org.opendc.harness.dsl.anyOf
@@ -40,13 +42,13 @@ public class MoreHpcPortfolio : Portfolio("more_hpc") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD),
- Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD),
- Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD)
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.0)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(1.0)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
index 49559e0e..9d5717bb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -40,10 +42,10 @@ public class MoreVelocityPortfolio : Portfolio("more_velocity") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
index 1aac4f9e..7ab586b3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,10 +38,10 @@ public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena")
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 02811d83..630b76c4 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -24,18 +24,16 @@ package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.ComputeWorkloadRunner
import org.opendc.compute.workload.export.parquet.ParquetExportMonitor
import org.opendc.compute.workload.grid5000
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.trace.RawParquetTraceReader
import org.opendc.compute.workload.util.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.topology.clusterTopology
-import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
@@ -47,7 +45,6 @@ import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Duration
import java.util.*
-import java.util.concurrent.ConcurrentHashMap
import kotlin.math.roundToLong
/**
@@ -92,9 +89,9 @@ abstract class Portfolio(name: String) : Experiment(name) {
abstract val allocationPolicy: String
/**
- * A map of trace readers.
+ * A helper class to load workload traces.
*/
- private val traceReaders = ConcurrentHashMap<String, RawParquetTraceReader>()
+ private val workloadLoader = ComputeWorkloadLoader(File(config.getString("trace-path")))
/**
* Perform a single trial for this portfolio.
@@ -102,19 +99,6 @@ abstract class Portfolio(name: String) : Experiment(name) {
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
- val workload = workload
- val workloadNames = if (workload is CompositeWorkload) {
- workload.workloads.map { it.name }
- } else {
- listOf(workload.name)
- }
- val rawReaders = workloadNames.map { workloadName ->
- traceReaders.computeIfAbsent(workloadName) {
- logger.info { "Loading trace $workloadName" }
- RawParquetTraceReader(File(config.getString("trace-path"), workloadName))
- }
- }
- val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
PerformanceInterferenceReader()
.read(File(config.getString("interference-model")))
@@ -125,7 +109,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements)
val failureModel =
if (operationalPhenomena.failureFrequency > 0)
- grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt())
+ grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()))
else
null
val runner = ComputeWorkloadRunner(
@@ -149,7 +133,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
runner.apply(topology)
// Run the workload trace
- runner.run(trace)
+ runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong())
} finally {
runner.close()
metricReader.close()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
index b6d3b30c..17ec48d4 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,7 +37,7 @@ public class ReplayPortfolio : Portfolio("replay") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity"))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
index 90840db8..98eb989d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,7 +37,7 @@ public class TestPortfolio : Portfolio("test") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity"))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
index c4ddd158..a2e71243 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
@@ -22,23 +22,12 @@
package org.opendc.experiments.capelin.model
-public enum class SamplingStrategy {
- REGULAR,
- HPC,
- HPC_LOAD
-}
+import org.opendc.compute.workload.ComputeWorkload
/**
- * A workload that is considered for a scenario.
- */
-public open class Workload(
- public open val name: String,
- public val fraction: Double,
- public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR
-)
-
-/**
- * A workload that is composed of multiple workloads.
+ * A single workload originating from a trace.
+ *
+ * @param name the name of the workload.
+ * @param source The source of the workload data.
*/
-public class CompositeWorkload(override val name: String, public val workloads: List<Workload>, public val totalLoad: Double) :
- Workload(name, -1.0)
+data class Workload(val name: String, val source: ComputeWorkload)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
deleted file mode 100644
index 498636ba..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import org.opendc.compute.workload.trace.RawParquetTraceReader
-import org.opendc.compute.workload.trace.TraceEntry
-import org.opendc.compute.workload.trace.TraceReader
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.simulator.compute.workload.SimWorkload
-
-/**
- * A [TraceReader] for the internal VM workload trace format.
- *
- * @param rawReaders The internal raw trace readers to use.
- * @param workload The workload to read.
- * @param seed The seed to use for sampling.
- */
-public class ParquetTraceReader(
- rawReaders: List<RawParquetTraceReader>,
- workload: Workload,
- seed: Int
-) : TraceReader<SimWorkload> {
- /**
- * The iterator over the actual trace.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>> =
- rawReaders
- .map { it.read() }
- .run {
- if (workload is CompositeWorkload) {
- this.zip(workload.workloads)
- } else {
- this.zip(listOf(workload))
- }
- }
- .flatMap {
- sampleWorkload(it.first, workload, it.second, seed)
- .sortedBy(TraceEntry<SimWorkload>::start)
- }
- .iterator()
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
deleted file mode 100644
index b42951df..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import mu.KotlinLogging
-import org.opendc.compute.workload.trace.TraceEntry
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.SamplingStrategy
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.util.*
-import kotlin.random.Random
-
-private val logger = KotlinLogging.logger {}
-
-/**
- * Sample the workload for the specified [run].
- */
-public fun sampleWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- subWorkload: Workload,
- seed: Int
-): List<TraceEntry<SimWorkload>> {
- return when {
- workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed)
- workload.samplingStrategy == SamplingStrategy.HPC ->
- sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false)
- workload.samplingStrategy == SamplingStrategy.HPC_LOAD ->
- sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true)
- else ->
- sampleRegularWorkload(trace, workload, workload, seed)
- }
-}
-
-/**
- * Sample a regular (non-HPC) workload.
- */
-public fun sampleRegularWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- subWorkload: Workload,
- seed: Int
-): List<TraceEntry<SimWorkload>> {
- val fraction = subWorkload.fraction
-
- val shuffled = trace.shuffled(Random(seed))
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- val totalLoad = if (workload is CompositeWorkload) {
- workload.totalLoad
- } else {
- shuffled.sumOf { it.meta.getValue("total-load") as Double }
- }
- var currentLoad = 0.0
-
- for (entry in shuffled) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- currentLoad += entryLoad
- res += entry
- }
-
- logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
-}
-
-/**
- * Sample a HPC workload.
- */
-public fun sampleHpcWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- seed: Int,
- sampleOnLoad: Boolean
-): List<TraceEntry<SimWorkload>> {
- val pattern = Regex("^vm__workload__(ComputeNode|cn).*")
- val random = Random(seed)
-
- val fraction = workload.fraction
- val (hpc, nonHpc) = trace.partition { entry ->
- val name = entry.name
- name.matches(pattern)
- }
-
- val hpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- hpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- val nonHpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- nonHpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
-
- val totalLoad = if (workload is CompositeWorkload) {
- workload.totalLoad
- } else {
- trace.sumOf { it.meta.getValue("total-load") as Double }
- }
-
- logger.debug { "Total trace load: $totalLoad" }
- var hpcCount = 0
- var hpcLoad = 0.0
- var nonHpcCount = 0
- var nonHpcLoad = 0.0
-
- val res = mutableListOf<TraceEntry<SimWorkload>>()
-
- if (sampleOnLoad) {
- var currentLoad = 0.0
- for (entry in hpcSequence) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- hpcLoad += entryLoad
- hpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
-
- for (entry in nonHpcSequence) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > 1) {
- break
- }
-
- nonHpcLoad += entryLoad
- nonHpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
- } else {
- hpcSequence
- .take((fraction * trace.size).toInt())
- .forEach { entry ->
- hpcLoad += entry.meta.getValue("total-load") as Double
- hpcCount += 1
- res.add(entry)
- }
-
- nonHpcSequence
- .take(((1 - fraction) * trace.size).toInt())
- .forEach { entry ->
- nonHpcLoad += entry.meta.getValue("total-load") as Double
- nonHpcCount += 1
- res.add(entry)
- }
- }
-
- logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
- logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
- logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
-}
-
-/**
- * Sample a random trace entry.
- */
-private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> {
- val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
- return entry.copy(uid = uid)
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index c1386bfe..140a84db 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -31,18 +31,12 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.workload.ComputeWorkloadRunner
-import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.*
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.trace.RawParquetTraceReader
-import org.opendc.compute.workload.trace.TraceReader
import org.opendc.compute.workload.util.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.topology.clusterTopology
-import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
@@ -68,6 +62,11 @@ class CapelinIntegrationTest {
private lateinit var computeScheduler: FilterScheduler
/**
+ * The [ComputeWorkloadLoader] responsible for loading the traces.
+ */
+ private lateinit var workloadLoader: ComputeWorkloadLoader
+
+ /**
* Setup the experimental environment.
*/
@BeforeEach
@@ -77,6 +76,7 @@ class CapelinIntegrationTest {
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
)
+ workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace"))
}
/**
@@ -84,24 +84,24 @@ class CapelinIntegrationTest {
*/
@Test
fun testLarge() = runBlockingSimulation {
- val traceReader = createTestTraceReader()
- val simulator = ComputeWorkloadRunner(
+ val workload = createTestWorkload(1.0)
+ val runner = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler
)
val topology = createTopology()
- val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+ val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor))
try {
- simulator.apply(topology)
- simulator.run(traceReader)
+ runner.apply(topology)
+ runner.run(workload, 0)
} finally {
- simulator.close()
+ runner.close()
metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
+ val serviceMetrics = collectServiceMetrics(clock.instant(), runner.producers[0])
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -117,11 +117,11 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } },
- { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } },
- { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } },
+ { assertEquals(221949826, monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(68421374, monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(947010, monitor.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } },
+ { assertEquals(5.783711298639437E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
@@ -131,7 +131,7 @@ class CapelinIntegrationTest {
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val traceReader = createTestTraceReader(0.25, seed)
+ val workload = createTestWorkload(0.25, seed)
val simulator = ComputeWorkloadRunner(
coroutineContext,
@@ -143,7 +143,7 @@ class CapelinIntegrationTest {
try {
simulator.apply(topology)
- simulator.run(traceReader)
+ simulator.run(workload, seed.toLong())
} finally {
simulator.close()
metricReader.close()
@@ -161,9 +161,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(8545158, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(12195642, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(941038, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -173,9 +173,8 @@ class CapelinIntegrationTest {
*/
@Test
fun testInterference() = runBlockingSimulation {
- val seed = 1
- val traceReader = createTestTraceReader(0.25, seed)
-
+ val seed = 0
+ val workload = createTestWorkload(1.0, seed)
val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
val performanceInterferenceModel =
PerformanceInterferenceReader()
@@ -193,7 +192,7 @@ class CapelinIntegrationTest {
try {
simulator.apply(topology)
- simulator.run(traceReader)
+ simulator.run(workload, seed.toLong())
} finally {
simulator.close()
metricReader.close()
@@ -211,10 +210,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
- { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } }
+ { assertEquals(8545158, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(12195642, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(941038, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(3378, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -228,15 +227,15 @@ class CapelinIntegrationTest {
coroutineContext,
clock,
computeScheduler,
- grid5000(Duration.ofDays(7), seed)
+ grid5000(Duration.ofDays(7))
)
val topology = createTopology("single")
- val traceReader = createTestTraceReader(0.25, seed)
+ val workload = createTestWorkload(0.25, seed)
val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
try {
simulator.apply(topology)
- simulator.run(traceReader)
+ simulator.run(workload, seed.toLong())
} finally {
simulator.close()
metricReader.close()
@@ -254,23 +253,20 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(8640140, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(12100660, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(939456, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
- { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } }
+ { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } }
)
}
/**
* Obtain the trace reader for the test.
*/
- private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
- return ParquetTraceReader(
- listOf(RawParquetTraceReader(File("src/test/resources/trace"))),
- Workload("test", fraction),
- seed
- )
+ private fun createTestWorkload(fraction: Double, seed: Int = 0): List<VirtualMachine> {
+ val source = trace("bitbrains-small").sampleByLoad(fraction)
+ return source.resolve(workloadLoader, Random(seed.toLong()))
}
/**
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
index ee76d38f..ee76d38f 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
index 9b1cde13..9b1cde13 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-radice/build.gradle.kts b/opendc-experiments/opendc-experiments-radice/build.gradle.kts
deleted file mode 100644
index 0c716183..00000000
--- a/opendc-experiments/opendc-experiments-radice/build.gradle.kts
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-description = "Experiments for the Risk Analysis work"
-
-/* Build configuration */
-plugins {
- `experiment-conventions`
- `testing-conventions`
-}
-
-dependencies {
- api(platform(projects.opendcPlatform))
- api(projects.opendcHarness.opendcHarnessApi)
- implementation(projects.opendcFormat)
- implementation(projects.opendcSimulator.opendcSimulatorCore)
- implementation(projects.opendcSimulator.opendcSimulatorCompute)
- implementation(projects.opendcCompute.opendcComputeSimulator)
- implementation(projects.opendcTelemetry.opendcTelemetrySdk)
-
- implementation(libs.kotlin.logging)
- implementation(libs.config)
- implementation(libs.progressbar)
- implementation(libs.clikt)
-
- implementation(libs.parquet)
- testImplementation(libs.log4j.slf4j)
-}