summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/test')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt339
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json21
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquetbin0 -> 2099 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquetbin0 -> 1125930 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquetbin2148 -> 0 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquetbin1672463 -> 0 bytes
7 files changed, 237 insertions, 125 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 2d5cc68c..e34c5bdc 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,185 +22,276 @@
package org.opendc.experiments.capelin
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.Channel
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
-import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.workload.*
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.compute.workload.topology.apply
+import org.opendc.compute.workload.util.PerformanceInterferenceReader
+import org.opendc.experiments.capelin.topology.clusterTopology
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
+import java.time.Duration
+import java.util.*
/**
- * An integration test suite for the SC20 experiments.
+ * An integration test suite for the Capelin experiments.
*/
class CapelinIntegrationTest {
/**
* The monitor used to keep track of the metrics.
*/
- private lateinit var monitor: TestExperimentReporter
+ private lateinit var exporter: TestComputeMetricExporter
+
+ /**
+ * The [FilterScheduler] to use for all experiments.
+ */
+ private lateinit var computeScheduler: FilterScheduler
+
+ /**
+ * The [ComputeWorkloadLoader] responsible for loading the traces.
+ */
+ private lateinit var workloadLoader: ComputeWorkloadLoader
/**
* Setup the experimental environment.
*/
@BeforeEach
fun setUp() {
- monitor = TestExperimentReporter()
+ exporter = TestComputeMetricExporter()
+ computeScheduler = FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace"))
}
+ /**
+ * Test a large simulation setup.
+ */
@Test
fun testLarge() = runBlockingSimulation {
- val failures = false
- val seed = 0
- val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
+ val workload = createTestWorkload(1.0)
+ val runner = ComputeWorkloadRunner(
+ coroutineContext,
+ clock,
+ computeScheduler
)
- val traceReader = createTestTraceReader()
- val environmentReader = createTestEnvironmentReader()
- lateinit var monitorResults: ComputeMetrics
-
- val meterProvider = createMeterProvider(clock)
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- val failureDomain = if (failures) {
- println("ENABLING failures")
- createFailureDomain(
- this,
- clock,
- seed,
- 24.0 * 7,
- scheduler,
- chan
- )
- } else {
- null
- }
-
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
- }
-
- failureDomain?.cancel()
+ val topology = createTopology()
+ val metricReader = CoroutineMetricReader(this, runner.producers, exporter)
+
+ try {
+ runner.apply(topology)
+ runner.run(workload, 0)
+ } finally {
+ runner.close()
+ metricReader.close()
}
- monitorResults = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}")
+ val serviceMetrics = collectServiceMetrics(runner.producers[0])
+ println(
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ )
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") },
- { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
- { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
- { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(207389912923, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(207122087280, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
- { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
+ { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") },
+ { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
+ { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
+ { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
+ { assertEquals(223325655, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
+ { assertEquals(67006560, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
+ { assertEquals(3159377, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
+ { assertEquals(5.840212485920686E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
+ /**
+ * Test a small simulation setup.
+ */
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
+ val workload = createTestWorkload(0.25, seed)
+
+ val simulator = ComputeWorkloadRunner(
+ coroutineContext,
+ clock,
+ computeScheduler
)
- val traceReader = createTestTraceReader(0.5, seed)
- val environmentReader = createTestEnvironmentReader("single")
-
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
- }
+ val topology = createTopology("single")
+ val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
+
+ try {
+ simulator.apply(topology)
+ simulator.run(workload, seed.toLong())
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(simulator.producers[0])
+ println(
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ )
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(96350072517, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(96330335057, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
- { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
+ { assertEquals(10997726, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9740289, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } },
+ { assertEquals(7.0099453912813E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }
)
}
/**
- * Obtain the trace reader for the test.
+ * Test a small simulation setup with interference.
*/
- private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
- return Sc20ParquetTraceReader(
- listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))),
- emptyMap(),
- Workload("test", fraction),
- seed
+ @Test
+ fun testInterference() = runBlockingSimulation {
+ val seed = 0
+ val workload = createTestWorkload(1.0, seed)
+ val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
+ val performanceInterferenceModel =
+ PerformanceInterferenceReader()
+ .read(perfInterferenceInput)
+ .let { VmInterferenceModel(it, Random(seed.toLong())) }
+
+ val simulator = ComputeWorkloadRunner(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ interferenceModel = performanceInterferenceModel
+ )
+ val topology = createTopology("single")
+ val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
+
+ try {
+ simulator.apply(topology)
+ simulator.run(workload, seed.toLong())
+ } finally {
+ simulator.close()
+ metricReader.close()
+ }
+
+ val serviceMetrics = collectServiceMetrics(simulator.producers[0])
+ println(
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ )
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(481251, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}
/**
- * Obtain the environment reader for the test.
+ * Test a small simulation setup with failures.
*/
- private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader {
- val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt")
- return Sc20ClusterEnvironmentReader(stream)
- }
+ @Test
+ fun testFailures() = runBlockingSimulation {
+ val seed = 1
+ val simulator = ComputeWorkloadRunner(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ grid5000(Duration.ofDays(7))
+ )
+ val topology = createTopology("single")
+ val workload = createTestWorkload(0.25, seed)
+ val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
- class TestExperimentReporter : ExperimentMonitor {
- var totalRequestedBurst = 0L
- var totalGrantedBurst = 0L
- var totalOvercommissionedBurst = 0L
- var totalInterferedBurst = 0L
-
- override fun reportHostSlice(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- numberOfDeployedImages: Int,
- host: Host,
- ) {
- totalRequestedBurst += requestedBurst
- totalGrantedBurst += grantedBurst
- totalOvercommissionedBurst += overcommissionedBurst
- totalInterferedBurst += interferedBurst
+ try {
+ simulator.apply(topology)
+ simulator.run(workload, seed.toLong())
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- override fun close() {}
+ val serviceMetrics = collectServiceMetrics(simulator.producers[0])
+ println(
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ )
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(10865478, exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9606177, exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } },
+ { assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } }
+ )
+ }
+
+ /**
+ * Obtain the trace reader for the test.
+ */
+ private fun createTestWorkload(fraction: Double, seed: Int = 0): List<VirtualMachine> {
+ val source = trace("bitbrains-small").sampleByLoad(fraction)
+ return source.resolve(workloadLoader, Random(seed.toLong()))
+ }
+
+ /**
+ * Obtain the topology factory for the test.
+ */
+ private fun createTopology(name: String = "topology"): Topology {
+ val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name.txt"))
+ return stream.use { clusterTopology(stream) }
+ }
+
+ class TestComputeMetricExporter : ComputeMetricExporter() {
+ var idleTime = 0L
+ var activeTime = 0L
+ var stealTime = 0L
+ var lostTime = 0L
+ var energyUsage = 0.0
+ var uptime = 0L
+
+ override fun record(data: HostData) {
+ idleTime += data.cpuIdleTime
+ activeTime += data.cpuActiveTime
+ stealTime += data.cpuStealTime
+ lostTime += data.cpuLostTime
+ energyUsage += data.powerTotal
+ uptime += data.uptime
+ }
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json
new file mode 100644
index 00000000..51fc6366
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json
@@ -0,0 +1,21 @@
+[
+ {
+ "vms": [
+ "141",
+ "379",
+ "851",
+ "116"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.8830158730158756
+ },
+ {
+ "vms": [
+ "205",
+ "116",
+ "463"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.7133055555552751
+ }
+]
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
index 53b3c2d7..5642003d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
@@ -1,3 +1,3 @@
ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
-A01;A01;8;3.2;64;1;64;8
+A01;A01;8;3.2;128;1;128;8
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
new file mode 100644
index 00000000..da6e5330
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
new file mode 100644
index 00000000..fe0a254c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
deleted file mode 100644
index ce7a812c..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
+++ /dev/null
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
deleted file mode 100644
index 1d7ce882..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
+++ /dev/null
Binary files differ