diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-01-08 18:18:43 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-01-11 15:23:56 +0100 |
| commit | a71d4885efcf01850bc236d3e9f77ab3f44b48aa (patch) | |
| tree | 797c65e0e5a37b73820ba4ef5d377b4a5524cd5f /simulator/opendc-experiments/opendc-experiments-sc20 | |
| parent | 42e9a5b5b610f41a03e68f6fc781c54b9402925b (diff) | |
Convert to pull-based workload model
This change converts the low-level workload model to be pull-based. This
reduces the overhead that we experienced with our previous co-routine
based approach.
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-sc20')
3 files changed, 13 insertions, 15 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index 9bc1a58e..4a318df4 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -66,8 +66,6 @@ public class Sc20RawParquetTraceReader(private val path: File) { val flops = record["flops"] as Long val fragment = SimTraceWorkload.Fragment( - tick, - flops, duration, cpuUsage, cores diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt index edef276c..ba22ae15 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt @@ -92,7 +92,7 @@ public class Sc20StreamingParquetTraceReader( /** * A poisonous fragment. */ - private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0, 0, 0.0, 0)) + private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0)) /** * The thread to read the records in. @@ -120,8 +120,6 @@ public class Sc20StreamingParquetTraceReader( val flops = record["flops"] as Long val fragment = SimTraceWorkload.Fragment( - tick, - flops, duration, cpuUsage, cores @@ -204,6 +202,7 @@ public class Sc20StreamingParquetTraceReader( val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>() buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) val fragments = sequence { + var time = submissionTime repeat@ while (true) { if (externalBuffer.isEmpty()) { if (hasNext) { @@ -220,7 +219,8 @@ public class Sc20StreamingParquetTraceReader( for (fragment in internalBuffer) { yield(fragment) - if (fragment.time >= endTime) { + time += fragment.duration + if (time >= endTime) { break@repeat } } diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt index 2eedb636..c5ad345d 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -90,7 +90,7 @@ class Sc20IntegrationTest { fun tearDown() = testScope.cleanupTestCoroutines() @Test - fun smoke() { + fun testLarge() { val failures = false val seed = 0 val chan = Channel<Unit>(Channel.CONFLATED) @@ -148,15 +148,15 @@ class Sc20IntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(207480856422, monitor.totalRequestedBurst) }, - { assertEquals(206510493178, monitor.totalGrantedBurst) }, - { assertEquals(336120436, monitor.totalOvercommissionedBurst) }, + { assertEquals(1684849230562, monitor.totalRequestedBurst) }, + { assertEquals(447612683996, monitor.totalGrantedBurst) }, + { assertEquals(1219535757406, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) } @Test - fun small() { + fun testSmall() { val seed = 1 val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = AvailableCoreMemoryAllocationPolicy() @@ -195,10 +195,10 @@ class Sc20IntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(96410877173, monitor.totalRequestedBurst) }, - { assertEquals(96046583992, monitor.totalGrantedBurst) }, - { assertEquals(19265632, monitor.totalOvercommissionedBurst) }, - { assertEquals(0, monitor.totalInterferedBurst) } + { assertEquals(705128393966, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } |
