diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-08-17 19:25:11 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-08-24 11:20:17 +0200 |
| commit | 31a1f298c71cd3203fdcd57bd39ba8813009dd5b (patch) | |
| tree | 02cc92ae680d05c766388c96f538faa14bc6fe83 /opendc-experiments | |
| parent | f03129779a1ec60e8689ad9c7fd5ad488c66f54c (diff) | |
refactor(simulator): Execute traces based on timestamps
This change refactors the trace workload in the OpenDC simulator to
track execute a fragment based on the fragment's timestamp. This makes
sure that the trace is replayed identically to the original execution.
Diffstat (limited to 'opendc-experiments')
5 files changed, 13 insertions, 9 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 2c443678..fa9fa2fc 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -258,7 +258,7 @@ suspend fun processTrace( delay(max(0, (entry.start - offset) - clock.millis())) launch { chan.send(Unit) - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace) + val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, offset = -offset + 300001) val server = client.newServer( entry.name, image, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt index 94193780..16ad6816 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -50,11 +50,13 @@ class RawParquetTraceReader(private val path: File) { val record = reader.read() ?: break val id = record["id"].toString() + val time = record["time"] as Long val duration = record["duration"] as Long val cores = record["cores"] as Int val cpuUsage = record["cpuUsage"] as Double val fragment = SimTraceWorkload.Fragment( + time, duration, cpuUsage, cores diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt index a3b45f47..35f4c5b8 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt @@ -81,7 +81,7 @@ class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = e /** * A poisonous fragment. */ - private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0)) + private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0)) /** * The thread to read the records in. @@ -103,11 +103,13 @@ class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = e } val id = record["id"].toString() + val time = record["time"] as Long val duration = record["duration"] as Long val cores = record["cores"] as Int val cpuUsage = record["cpuUsage"] as Double val fragment = SimTraceWorkload.Fragment( + time, duration, cpuUsage, cores diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 75428011..00ab9190 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -114,9 +114,9 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(207380204679, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(207371815929, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(8388750, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, + { assertEquals(155252275350, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, + { assertEquals(155086837649, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(165488283, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) } @@ -151,9 +151,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(96344616902, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(96324879442, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(29454904468, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(29355293349, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(99627123, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt index 9a93092e..a119a219 100644 --- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt +++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt @@ -29,6 +29,6 @@ import org.opendc.simulator.compute.workload.SimWorkload /** * A [SimFaaSWorkload] for a [FunctionTrace]. */ -public class FunctionTraceWorkload(trace: FunctionTrace) : SimFaaSWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.duration, it.cpuUsage, 1) }) { +public class FunctionTraceWorkload(trace: FunctionTrace) : SimFaaSWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.timestamp, it.duration, it.cpuUsage, 1) }) { override suspend fun invoke() {} } |
