summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src/test
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-25 14:53:54 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-25 14:53:54 +0200
commitaa9b32f8cd1467e9718959f400f6777e5d71737d (patch)
treeb88bbede15108c6855d7f94ded4c7054df186a72 /opendc-experiments/opendc-experiments-capelin/src/test
parenteb0e0a3bc557c05a70eead388797ab850ea87366 (diff)
parentb7a71e5b4aa77b41ef41deec2ace42b67a5a13a7 (diff)
merge: Integrate v2.1 progress into public repository
This pull request integrates the changes planned for the v2.1 release of OpenDC into the public Github repository in order to sync the progress of both repositories.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/test')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt339
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json21
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquetbin0 -> 2099 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquetbin0 -> 1125930 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquetbin2148 -> 0 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquetbin1672463 -> 0 bytes
7 files changed, 237 insertions, 125 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 2d5cc68c..e34c5bdc 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,185 +22,276 @@
package org.opendc.experiments.capelin
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.Channel
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
-import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.workload.*
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.compute.workload.topology.apply
+import org.opendc.compute.workload.util.PerformanceInterferenceReader
+import org.opendc.experiments.capelin.topology.clusterTopology
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
+import java.time.Duration
+import java.util.*
/**
- * An integration test suite for the SC20 experiments.
+ * An integration test suite for the Capelin experiments.
*/
class CapelinIntegrationTest {
/**
* The monitor used to keep track of the metrics.
*/
- private lateinit var monitor: TestExperimentReporter
+ private lateinit var exporter: TestComputeMetricExporter
+
+ /**
+ * The [FilterScheduler] to use for all experiments.
+ */
+ private lateinit var computeScheduler: FilterScheduler
+
+ /**
+ * The [ComputeWorkloadLoader] responsible for loading the traces.
+ */
+ private lateinit var workloadLoader: ComputeWorkloadLoader
/**
* Setup the experimental environment.
*/
@BeforeEach
fun setUp() {
- monitor = TestExperimentReporter()
+ exporter = TestComputeMetricExporter()
+ computeScheduler = FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace"))
}
+ /**
+ * Test a large simulation setup.
+ */
@Test
fun testLarge() = runBlockingSimulation {
- val failures = false
- val seed = 0
- val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
+ val workload = createTestWorkload(1.0)
+ val runner = ComputeWorkloadRunner(
+ coroutineContext,
+ clock,
+ computeScheduler
)
- val traceReader = createTestTraceReader()
- val environmentReader = createTestEnvironmentReader()
- lateinit var monitorResults: ComputeMetrics
-
- val meterProvider = createMeterProvider(clock)
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- val failureDomain = if (failures) {
- println("ENABLING failures")
- createFailureDomain(
- this,
- clock,
- seed,
- 24.0 * 7,
- scheduler,
- chan
- )
- } else {
- null
- }
-
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
- }
-
- failureDomain?.cancel()
+ val topology = createTopology()
+ val metricReader = CoroutineMetricReader(this, runner.producers, exporter)
+
+ try {
+ runner.apply(topology)
+ runner.run(workload, 0)
+ } finally {
+ runner.close()
+ metricReader.close()
}
- monitorResults = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}")
+ val serviceMetrics = collectServiceMetrics(runner.producers[0])
+ println(
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ )
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") },
- { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
- { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
- { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(207389912923, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(207122087280, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
- { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
+ { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") },
+ { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
+ { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
+ { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
+ { assertEquals(223325655, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
+ { assertEquals(67006560, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
+ { assertEquals(3159377, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
+ { assertEquals(5.840212485920686E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
+ /**
+ * Test a small simulation setup.
+ */
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
+ val workload = createTestWorkload(0.25, seed)
+
+ val simulator = ComputeWorkloadRunner(
+ coroutineContext,
+ clock,
+ computeScheduler
)
- val traceReader = createTestTraceReader(0.5, seed)
- val environmentReader = createTestEnvironmentReader("single")
-
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
- }
+ val topology = createTopology("single")
+ val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
+
+ try {
+ simulator.apply(topology)
+ simulator.run(workload, seed.toLong())
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(simulator.producers[0])
+ println(
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ )
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(96350072517, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(96330335057, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
- { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
+ { assertEquals(10997726, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9740289, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } },
+ { assertEquals(7.0099453912813E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }
)
}
/**
- * Obtain the trace reader for the test.
+ * Test a small simulation setup with interference.
*/
- private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
- return Sc20ParquetTraceReader(
- listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))),
- emptyMap(),
- Workload("test", fraction),
- seed
+ @Test
+ fun testInterference() = runBlockingSimulation {
+ val seed = 0
+ val workload = createTestWorkload(1.0, seed)
+ val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
+ val performanceInterferenceModel =
+ PerformanceInterferenceReader()
+ .read(perfInterferenceInput)
+ .let { VmInterferenceModel(it, Random(seed.toLong())) }
+
+ val simulator = ComputeWorkloadRunner(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ interferenceModel = performanceInterferenceModel
+ )
+ val topology = createTopology("single")
+ val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
+
+ try {
+ simulator.apply(topology)
+ simulator.run(workload, seed.toLong())
+ } finally {
+ simulator.close()
+ metricReader.close()
+ }
+
+ val serviceMetrics = collectServiceMetrics(simulator.producers[0])
+ println(
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ )
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(481251, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}
/**
- * Obtain the environment reader for the test.
+ * Test a small simulation setup with failures.
*/
- private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader {
- val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt")
- return Sc20ClusterEnvironmentReader(stream)
- }
+ @Test
+ fun testFailures() = runBlockingSimulation {
+ val seed = 1
+ val simulator = ComputeWorkloadRunner(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ grid5000(Duration.ofDays(7))
+ )
+ val topology = createTopology("single")
+ val workload = createTestWorkload(0.25, seed)
+ val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
- class TestExperimentReporter : ExperimentMonitor {
- var totalRequestedBurst = 0L
- var totalGrantedBurst = 0L
- var totalOvercommissionedBurst = 0L
- var totalInterferedBurst = 0L
-
- override fun reportHostSlice(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- numberOfDeployedImages: Int,
- host: Host,
- ) {
- totalRequestedBurst += requestedBurst
- totalGrantedBurst += grantedBurst
- totalOvercommissionedBurst += overcommissionedBurst
- totalInterferedBurst += interferedBurst
+ try {
+ simulator.apply(topology)
+ simulator.run(workload, seed.toLong())
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- override fun close() {}
+ val serviceMetrics = collectServiceMetrics(simulator.producers[0])
+ println(
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ )
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(10865478, exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9606177, exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } },
+ { assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } }
+ )
+ }
+
+ /**
+ * Obtain the trace reader for the test.
+ */
+ private fun createTestWorkload(fraction: Double, seed: Int = 0): List<VirtualMachine> {
+ val source = trace("bitbrains-small").sampleByLoad(fraction)
+ return source.resolve(workloadLoader, Random(seed.toLong()))
+ }
+
+ /**
+ * Obtain the topology factory for the test.
+ */
+ private fun createTopology(name: String = "topology"): Topology {
+ val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name.txt"))
+ return stream.use { clusterTopology(stream) }
+ }
+
+ class TestComputeMetricExporter : ComputeMetricExporter() {
+ var idleTime = 0L
+ var activeTime = 0L
+ var stealTime = 0L
+ var lostTime = 0L
+ var energyUsage = 0.0
+ var uptime = 0L
+
+ override fun record(data: HostData) {
+ idleTime += data.cpuIdleTime
+ activeTime += data.cpuActiveTime
+ stealTime += data.cpuStealTime
+ lostTime += data.cpuLostTime
+ energyUsage += data.powerTotal
+ uptime += data.uptime
+ }
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json
new file mode 100644
index 00000000..51fc6366
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json
@@ -0,0 +1,21 @@
+[
+ {
+ "vms": [
+ "141",
+ "379",
+ "851",
+ "116"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.8830158730158756
+ },
+ {
+ "vms": [
+ "205",
+ "116",
+ "463"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.7133055555552751
+ }
+]
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
index 53b3c2d7..5642003d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
@@ -1,3 +1,3 @@
ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
-A01;A01;8;3.2;64;1;64;8
+A01;A01;8;3.2;128;1;128;8
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
new file mode 100644
index 00000000..da6e5330
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
new file mode 100644
index 00000000..fe0a254c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
deleted file mode 100644
index ce7a812c..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
+++ /dev/null
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
deleted file mode 100644
index 1d7ce882..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
+++ /dev/null
Binary files differ