summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/test')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt262
1 files changed, 133 insertions, 129 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 44cf92a8..727530e3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,7 +22,6 @@
package org.opendc.experiments.capelin
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@@ -32,7 +31,6 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
@@ -40,14 +38,17 @@ import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
import org.opendc.experiments.capelin.trace.TraceReader
+import org.opendc.experiments.capelin.util.ComputeServiceSimulator
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.withMonitor
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
+import java.time.Duration
import java.util.*
/**
@@ -60,11 +61,20 @@ class CapelinIntegrationTest {
private lateinit var monitor: TestExperimentReporter
/**
+ * The [FilterScheduler] to use for all experiments.
+ */
+ private lateinit var computeScheduler: FilterScheduler
+
+ /**
* Setup the experimental environment.
*/
@BeforeEach
fun setUp() {
monitor = TestExperimentReporter()
+ computeScheduler = FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
}
/**
@@ -72,45 +82,46 @@ class CapelinIntegrationTest {
*/
@Test
fun testLarge() = runBlockingSimulation {
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- val meterProvider = createMeterProvider(clock)
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
- "Finish " +
- "SUBMIT=${serviceMetrics.instanceCount} " +
- "FAIL=${serviceMetrics.failedInstanceCount} " +
- "QUEUE=${serviceMetrics.queuedInstanceCount} " +
- "RUNNING=${serviceMetrics.runningInstanceCount}"
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") },
- { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") },
- { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") },
- { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") },
- { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } },
- { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } },
- { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
- { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } },
- { assertEquals(1.7671768767192196E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } },
+ { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") },
+ { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
+ { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
+ { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
+ { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
+ { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
@@ -120,41 +131,41 @@ class CapelinIntegrationTest {
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
- "Finish " +
- "SUBMIT=${serviceMetrics.instanceCount} " +
- "FAIL=${serviceMetrics.failedInstanceCount} " +
- "QUEUE=${serviceMetrics.queuedInstanceCount} " +
- "RUNNING=${serviceMetrics.runningInstanceCount}"
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -164,10 +175,6 @@ class CapelinIntegrationTest {
@Test
fun testInterference() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
@@ -177,34 +184,39 @@ class CapelinIntegrationTest {
.read(perfInterferenceInput)
.let { VmInterferenceModel(it, Random(seed.toLong())) }
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ interferenceModel = performanceInterferenceModel
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
- "Finish " +
- "SUBMIT=${serviceMetrics.instanceCount} " +
- "FAIL=${serviceMetrics.failedInstanceCount} " +
- "QUEUE=${serviceMetrics.queuedInstanceCount} " +
- "RUNNING=${serviceMetrics.runningInstanceCount}"
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -214,53 +226,43 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- val faultInjector =
- createFaultInjector(
- coroutineContext,
- clock,
- scheduler.hosts.map { it as SimHost }.toSet(),
- seed,
- 24.0 * 7,
- )
-
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- faultInjector.start()
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
-
- faultInjector.close()
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ grid5000(Duration.ofDays(7), seed)
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
- "Finish " +
- "SUBMIT=${serviceMetrics.instanceCount} " +
- "FAIL=${serviceMetrics.failedInstanceCount} " +
- "QUEUE=${serviceMetrics.queuedInstanceCount} " +
- "RUNNING=${serviceMetrics.runningInstanceCount}"
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } }
)
}
@@ -284,18 +286,20 @@ class CapelinIntegrationTest {
}
class TestExperimentReporter : ComputeMonitor {
- var totalWork = 0L
- var totalGrantedWork = 0L
- var totalOvercommittedWork = 0L
- var totalInterferedWork = 0L
- var totalPowerDraw = 0.0
+ var idleTime = 0L
+ var activeTime = 0L
+ var stealTime = 0L
+ var lostTime = 0L
+ var energyUsage = 0.0
+ var uptime = 0L
override fun record(data: HostData) {
- this.totalWork += data.totalWork.toLong()
- totalGrantedWork += data.grantedWork.toLong()
- totalOvercommittedWork += data.overcommittedWork.toLong()
- totalInterferedWork += data.interferedWork.toLong()
- totalPowerDraw += data.powerDraw
+ idleTime += data.cpuIdleTime
+ activeTime += data.cpuActiveTime
+ stealTime += data.cpuStealTime
+ lostTime += data.cpuLostTime
+ energyUsage += data.powerTotal
+ uptime += data.uptime
}
}
}