diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-09 16:46:33 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-10 16:13:23 +0200 |
| commit | 04945381d01d8c6e59befe6843f2c6f6da5e91bf (patch) | |
| tree | 6e6431fc6c19b4ef862e3cfc6be9d56215aa5ebb /opendc-experiments/opendc-experiments-capelin | |
| parent | f20d615e3f6e5b9d02526ac033778fb0419fed4e (diff) | |
refactor(capelin): Terminate servers after reaching deadline
This change updates the Capelin experiment helpers to terminate a server
when it has reached its end-date.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin')
3 files changed, 33 insertions, 26 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 3d605300..512b754d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -54,7 +54,6 @@ import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.sdk.toOtelClock import java.time.Clock -import kotlin.coroutines.resume import kotlin.math.ln import kotlin.math.max import kotlin.random.Random @@ -169,10 +168,19 @@ suspend fun processTrace( monitor: ComputeMonitor? = null, ) { val client = scheduler.newClient() + val watcher = object : ServerWatcher { + override fun onStateChanged(server: Server, newState: ServerState) { + monitor?.onStateChange(clock.millis(), server, newState) + } + } + + // Create new image for the virtual machine val image = client.newImage("vm-image") - var offset = Long.MIN_VALUE + try { coroutineScope { + var offset = Long.MIN_VALUE + while (reader.hasNext()) { val entry = reader.next() @@ -183,9 +191,12 @@ suspend fun processTrace( // Make sure the trace entries are ordered by submission time assert(entry.start - offset >= 0) { "Invalid trace order" } delay(max(0, (entry.start - offset) - clock.millis())) + launch { chan.send(Unit) - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, offset = -offset + 300001) + + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) val server = client.newServer( entry.name, image, @@ -196,18 +207,14 @@ suspend fun processTrace( ), meta = entry.meta + mapOf("workload" to workload) ) + server.watch(watcher) - suspendCancellableCoroutine { cont -> - server.watch(object : ServerWatcher { - override fun onStateChanged(server: Server, newState: ServerState) { - monitor?.onStateChange(clock.millis(), server, newState) + // Wait for the server reach its end time + val endTime = entry.meta["end-time"] as Long + delay(endTime + workloadOffset - clock.millis() + 1) - if (newState == ServerState.TERMINATED) { - cont.resume(Unit) - } - } - }) - } + // Delete the server after reaching the end-time of the virtual machine + server.delete() } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index aed9a4bb..ab33bc25 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -52,7 +52,7 @@ import java.io.File import java.util.* /** - * An integration test suite for the SC20 experiments. + * An integration test suite for the Capelin experiments. */ class CapelinIntegrationTest { /** @@ -146,7 +146,7 @@ class CapelinIntegrationTest { filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) ) - val traceReader = createTestTraceReader(0.5, seed) + val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") val meterProvider = createMeterProvider(clock) @@ -174,9 +174,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(38051879552, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(34888186408, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(971668973, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -226,10 +226,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(38051879552, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(34888186408, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(971668973, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(13910814, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -284,9 +284,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(25412073109, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(23695061858, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(368502468, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt index 53b3c2d7..5642003d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt @@ -1,3 +1,3 @@ ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost -A01;A01;8;3.2;64;1;64;8 +A01;A01;8;3.2;128;1;128;8 |
