From 31a1f298c71cd3203fdcd57bd39ba8813009dd5b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 17 Aug 2021 19:25:11 +0200 Subject: refactor(simulator): Execute traces based on timestamps This change refactors the trace workload in the OpenDC simulator to track execute a fragment based on the fragment's timestamp. This makes sure that the trace is replayed identically to the original execution. --- .../org/opendc/experiments/capelin/ExperimentHelpers.kt | 2 +- .../experiments/capelin/trace/RawParquetTraceReader.kt | 2 ++ .../experiments/capelin/trace/StreamingParquetTraceReader.kt | 4 +++- .../org/opendc/experiments/capelin/CapelinIntegrationTest.kt | 12 ++++++------ 4 files changed, 12 insertions(+), 8 deletions(-) (limited to 'opendc-experiments/opendc-experiments-capelin') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 2c443678..fa9fa2fc 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -258,7 +258,7 @@ suspend fun processTrace( delay(max(0, (entry.start - offset) - clock.millis())) launch { chan.send(Unit) - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace) + val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, offset = -offset + 300001) val server = client.newServer( entry.name, image, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt index 94193780..16ad6816 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -50,11 +50,13 @@ class RawParquetTraceReader(private val path: File) { val record = reader.read() ?: break val id = record["id"].toString() + val time = record["time"] as Long val duration = record["duration"] as Long val cores = record["cores"] as Int val cpuUsage = record["cpuUsage"] as Double val fragment = SimTraceWorkload.Fragment( + time, duration, cpuUsage, cores diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt index a3b45f47..35f4c5b8 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt @@ -81,7 +81,7 @@ class StreamingParquetTraceReader(traceFile: File, selectedVms: List = e /** * A poisonous fragment. */ - private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0)) + private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0)) /** * The thread to read the records in. @@ -103,11 +103,13 @@ class StreamingParquetTraceReader(traceFile: File, selectedVms: List = e } val id = record["id"].toString() + val time = record["time"] as Long val duration = record["duration"] as Long val cores = record["cores"] as Int val cpuUsage = record["cpuUsage"] as Double val fragment = SimTraceWorkload.Fragment( + time, duration, cpuUsage, cores diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 75428011..00ab9190 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -114,9 +114,9 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(207380204679, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(207371815929, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(8388750, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, + { assertEquals(155252275350, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, + { assertEquals(155086837649, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(165488283, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) } @@ -151,9 +151,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(96344616902, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(96324879442, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(29454904468, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(29355293349, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(99627123, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } -- cgit v1.2.3