From 31a1f298c71cd3203fdcd57bd39ba8813009dd5b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 17 Aug 2021 19:25:11 +0200 Subject: refactor(simulator): Execute traces based on timestamps This change refactors the trace workload in the OpenDC simulator to track execute a fragment based on the fragment's timestamp. This makes sure that the trace is replayed identically to the original execution. --- .../org/opendc/compute/simulator/SimHostTest.kt | 24 ++-- .../experiments/capelin/ExperimentHelpers.kt | 2 +- .../capelin/trace/RawParquetTraceReader.kt | 2 + .../capelin/trace/StreamingParquetTraceReader.kt | 4 +- .../experiments/capelin/CapelinIntegrationTest.kt | 12 +- .../serverless/trace/FunctionTraceWorkload.kt | 2 +- .../format/trace/bitbrains/BitbrainsTraceReader.kt | 7 +- .../org/opendc/format/trace/swf/SwfTraceReader.kt | 5 +- .../simulator/compute/SimMachineBenchmarks.kt | 16 +-- .../opendc/simulator/compute/SimAbstractMachine.kt | 1 - .../simulator/compute/workload/SimTraceWorkload.kt | 76 +++++++----- .../org/opendc/simulator/compute/SimMachineTest.kt | 1 - .../simulator/compute/kernel/SimHypervisorTest.kt | 40 +++---- .../compute/kernel/SimSpaceSharedHypervisorTest.kt | 8 +- .../compute/workload/SimTraceWorkloadTest.kt | 133 +++++++++++++++++++++ 15 files changed, 243 insertions(+), 90 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 5a6fb03d..45fdb268 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -99,11 +99,12 @@ internal class SimHostTest { mapOf( "workload" to SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 3500.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 183.0, 2) + SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), + SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 2 * 183.0, 2) ), + offset = 1 ) ) ) @@ -114,11 +115,12 @@ internal class SimHostTest { mapOf( "workload" to SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 3100.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 73.0, 2) - ) + SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceWorkload.Fragment(duration * 1000L, duration * 1000, 2 * 3100.0, 2), + SimTraceWorkload.Fragment(duration * 2000L, duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(duration * 3000L, duration * 1000, 2 * 73.0, 2) + ), + offset = 1 ) ) ) @@ -150,7 +152,7 @@ internal class SimHostTest { override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() }, - exportInterval = duration * 1000 + exportInterval = duration * 1000L ) coroutineScope { @@ -171,7 +173,7 @@ internal class SimHostTest { } // Ensure last cycle is collected - delay(1000 * duration) + delay(1000L * duration) virtDriver.close() reader.close() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 2c443678..fa9fa2fc 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -258,7 +258,7 @@ suspend fun processTrace( delay(max(0, (entry.start - offset) - clock.millis())) launch { chan.send(Unit) - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace) + val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, offset = -offset + 300001) val server = client.newServer( entry.name, image, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt index 94193780..16ad6816 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -50,11 +50,13 @@ class RawParquetTraceReader(private val path: File) { val record = reader.read() ?: break val id = record["id"].toString() + val time = record["time"] as Long val duration = record["duration"] as Long val cores = record["cores"] as Int val cpuUsage = record["cpuUsage"] as Double val fragment = SimTraceWorkload.Fragment( + time, duration, cpuUsage, cores diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt index a3b45f47..35f4c5b8 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt @@ -81,7 +81,7 @@ class StreamingParquetTraceReader(traceFile: File, selectedVms: List = e /** * A poisonous fragment. */ - private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0)) + private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0)) /** * The thread to read the records in. @@ -103,11 +103,13 @@ class StreamingParquetTraceReader(traceFile: File, selectedVms: List = e } val id = record["id"].toString() + val time = record["time"] as Long val duration = record["duration"] as Long val cores = record["cores"] as Int val cpuUsage = record["cpuUsage"] as Double val fragment = SimTraceWorkload.Fragment( + time, duration, cpuUsage, cores diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 75428011..00ab9190 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -114,9 +114,9 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(207380204679, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(207371815929, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(8388750, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, + { assertEquals(155252275350, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, + { assertEquals(155086837649, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(165488283, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) } @@ -151,9 +151,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(96344616902, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(96324879442, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(29454904468, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(29355293349, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(99627123, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt index 9a93092e..a119a219 100644 --- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt +++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt @@ -29,6 +29,6 @@ import org.opendc.simulator.compute.workload.SimWorkload /** * A [SimFaaSWorkload] for a [FunctionTrace]. */ -public class FunctionTraceWorkload(trace: FunctionTrace) : SimFaaSWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.duration, it.cpuUsage, 1) }) { +public class FunctionTraceWorkload(trace: FunctionTrace) : SimFaaSWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.timestamp, it.duration, it.cpuUsage, 1) }) { override suspend fun invoke() {} } diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index aaf8a240..cd8021fe 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -85,17 +85,19 @@ public class BitbrainsTraceReader(traceDirectory: File) : TraceReader 0) { flopsHistory.add( SimTraceWorkload.Fragment( + submitTime + slicedWaitTime + runTime, sliceDuration, runtimePartialSliceRemainder / sliceDuration.toDouble(), cores diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index 8f60bf05..30797089 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -70,14 +70,14 @@ class SimMachineBenchmarks { @Setup fun setUp() { trace = sequenceOf( - SimTraceWorkload.Fragment(1000, 28.0, 1), - SimTraceWorkload.Fragment(1000, 3500.0, 1), - SimTraceWorkload.Fragment(1000, 0.0, 1), - SimTraceWorkload.Fragment(1000, 183.0, 1), - SimTraceWorkload.Fragment(1000, 400.0, 1), - SimTraceWorkload.Fragment(1000, 100.0, 1), - SimTraceWorkload.Fragment(1000, 3000.0, 1), - SimTraceWorkload.Fragment(1000, 4500.0, 1), + SimTraceWorkload.Fragment(0, 1000, 28.0, 1), + SimTraceWorkload.Fragment(1000, 1000, 3500.0, 1), + SimTraceWorkload.Fragment(2000, 1000, 0.0, 1), + SimTraceWorkload.Fragment(3000, 1000, 183.0, 1), + SimTraceWorkload.Fragment(4000, 1000, 400.0, 1), + SimTraceWorkload.Fragment(5000, 1000, 100.0, 1), + SimTraceWorkload.Fragment(6000, 1000, 3000.0, 1), + SimTraceWorkload.Fragment(7000, 1000, 4500.0, 1), ) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 139c66e0..f416643e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -116,7 +116,6 @@ public abstract class SimAbstractMachine( // Cancel all cpus on cancellation cont.invokeOnCancellation { this.cont = null - interpreter.batch { for (cpu in cpus) { cpu.cancel() diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 622bcd4d..fc49f357 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -27,25 +27,19 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext -import org.opendc.simulator.resources.consumer.SimConsumerBarrier /** * A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource * consumption for some period of time. + * + * @param trace The trace of fragments to use. + * @param offset The offset for the timestamps. */ -public class SimTraceWorkload(public val trace: Sequence) : SimWorkload { - private var offset = Long.MIN_VALUE +public class SimTraceWorkload(public val trace: Sequence, private val offset: Long = 0L) : SimWorkload { private val iterator = trace.iterator() private var fragment: Fragment? = null - private lateinit var barrier: SimConsumerBarrier override fun onStart(ctx: SimMachineContext) { - check(offset == Long.MIN_VALUE) { "Workload does not support re-use" } - - barrier = SimConsumerBarrier(ctx.cpus.size) - fragment = nextFragment() - offset = ctx.interpreter.clock.millis() - val lifecycle = SimWorkloadLifecycle(ctx) for (cpu in ctx.cpus) { @@ -56,43 +50,59 @@ public class SimTraceWorkload(public val trace: Sequence) : SimWorkloa override fun toString(): String = "SimTraceWorkload" /** - * Obtain the next fragment. + * Obtain the fragment with a timestamp equal or greater than [now]. */ - private fun nextFragment(): Fragment? { - return if (iterator.hasNext()) { - iterator.next() - } else { - null + private fun pullFragment(now: Long): Fragment? { + var fragment = fragment + if (fragment != null && !fragment.isExpired(now)) { + return fragment + } + + while (iterator.hasNext()) { + fragment = iterator.next() + if (!fragment.isExpired(now)) { + this.fragment = fragment + return fragment + } } + + this.fragment = null + return null + } + + /** + * Determine if the specified [Fragment] is expired, i.e., it has already passed. + */ + private fun Fragment.isExpired(now: Long): Boolean { + val timestamp = this.timestamp + offset + return now >= timestamp + duration } private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer { override fun onNext(ctx: SimResourceContext): SimResourceCommand { val now = ctx.clock.millis() - val fragment = fragment ?: return SimResourceCommand.Exit - val usage = fragment.usage / fragment.cores - val work = (fragment.duration / 1000) * usage - val deadline = offset + fragment.duration + val fragment = pullFragment(now) ?: return SimResourceCommand.Exit + val timestamp = fragment.timestamp + offset - assert(deadline >= now) { "Deadline already passed" } - - val cmd = - if (cpu.id < fragment.cores && work > 0.0) - SimResourceCommand.Consume(work, usage, deadline) - else - SimResourceCommand.Idle(deadline) - - if (barrier.enter()) { - this@SimTraceWorkload.fragment = nextFragment() - this@SimTraceWorkload.offset += fragment.duration + // Fragment is in the future + if (timestamp > now) { + return SimResourceCommand.Idle(timestamp) } - return cmd + val usage = fragment.usage / fragment.cores + val deadline = timestamp + fragment.duration + val duration = deadline - now + val work = duration * usage / 1000 + + return if (cpu.id < fragment.cores && work > 0.0) + SimResourceCommand.Consume(work, usage, deadline) + else + SimResourceCommand.Idle(deadline) } } /** * A fragment of the workload. */ - public data class Fragment(val duration: Long, val usage: Double, val cores: Int) + public data class Fragment(val timestamp: Long, val duration: Long, val usage: Double, val cores: Int) } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index a6d955ca..19808a77 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -44,7 +44,6 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer /** * Test suite for the [SimBareMetalMachine] class. */ -@OptIn(ExperimentalCoroutinesApi::class) class SimMachineTest { private lateinit var machineModel: MachineModel diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt index a61cba8d..afc4c949 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt @@ -91,10 +91,10 @@ internal class SimHypervisorTest { val workloadA = SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) ), ) @@ -154,19 +154,19 @@ internal class SimHypervisorTest { val workloadA = SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) ), ) val workloadB = SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 73.0, 1) + SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3100.0, 1), + SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1) ) ) @@ -251,19 +251,19 @@ internal class SimHypervisorTest { val workloadA = SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) ), ) val workloadB = SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 73.0, 1) + SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3100.0, 1), + SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1) ) ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 7c77b283..80496992 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -71,10 +71,10 @@ internal class SimSpaceSharedHypervisorTest { val workloadA = SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) ), ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt new file mode 100644 index 00000000..39c1eb5a --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +import kotlinx.coroutines.delay +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.model.* +import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceInterpreter + +/** + * Test suite for the [SimTraceWorkloadTest] class. + */ +class SimTraceWorkloadTest { + private lateinit var machineModel: MachineModel + + @BeforeEach + fun setUp() { + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + + machineModel = MachineModel( + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + } + + @Test + fun testSmoke() = runBlockingSimulation { + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) + + val workload = SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), + SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), + SimTraceWorkload.Fragment(2000, 1000, 0.0, 2), + SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + ), + offset = 0 + ) + + try { + machine.run(workload) + + assertEquals(4000, clock.millis()) + } finally { + machine.close() + } + } + + @Test + fun testOffset() = runBlockingSimulation { + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) + + val workload = SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), + SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), + SimTraceWorkload.Fragment(2000, 1000, 0.0, 2), + SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + ), + offset = 1000 + ) + + try { + machine.run(workload) + + assertEquals(5000, clock.millis()) + } finally { + machine.close() + } + } + + @Test + fun testSkipFragment() = runBlockingSimulation { + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) + + val workload = SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), + SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), + SimTraceWorkload.Fragment(2000, 1000, 0.0, 2), + SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + ), + offset = 0 + ) + + try { + delay(1000L) + machine.run(workload) + + assertEquals(4000, clock.millis()) + } finally { + machine.close() + } + } +} -- cgit v1.2.3 From 484848e2e0bfdaf46f10112e358d3475dbf8e725 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 18 Aug 2021 12:13:07 +0200 Subject: fix(simulator): Record overcommit only after deadline This change fixes an issue with the simulator where it would record overcomitted work if the output was updated before the deadline was reached. --- .../org/opendc/experiments/capelin/CapelinIntegrationTest.kt | 2 +- .../opendc/simulator/resources/SimAbstractResourceAggregator.kt | 4 ++-- .../org/opendc/simulator/resources/SimAbstractResourceProvider.kt | 7 +++++-- .../org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt | 7 ++++--- .../org/opendc/simulator/resources/SimResourceProviderLogic.kt | 3 ++- .../kotlin/org/opendc/simulator/resources/SimResourceSource.kt | 4 ++-- .../org/opendc/simulator/resources/impl/SimResourceContextImpl.kt | 5 +++-- 7 files changed, 19 insertions(+), 13 deletions(-) diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 00ab9190..3dca1e09 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -116,7 +116,7 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, { assertEquals(155252275350, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, { assertEquals(155086837649, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(165488283, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, + { assertEquals(155088144, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 8a24b3e7..00648876 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -112,8 +112,8 @@ public abstract class SimAbstractResourceAggregator( doFinish() } - override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { - updateCounters(ctx, work) + override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { + updateCounters(ctx, work, willOvercommit) } override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt index 860c50ee..4e8e803a 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt @@ -87,7 +87,7 @@ public abstract class SimAbstractResourceProvider( /** * Update the counters of the resource provider. */ - protected fun updateCounters(ctx: SimResourceContext, work: Double) { + protected fun updateCounters(ctx: SimResourceContext, work: Double, willOvercommit: Boolean) { if (work <= 0.0) { return } @@ -96,7 +96,10 @@ public abstract class SimAbstractResourceProvider( val remainingWork = ctx.remainingWork counters.demand += work counters.actual += work - remainingWork - counters.overcommit += remainingWork + + if (willOvercommit && remainingWork > 0.0) { + counters.overcommit += remainingWork + } } final override fun startConsumer(consumer: SimResourceConsumer) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index 398797cf..6b420911 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -160,7 +160,8 @@ public class SimResourceDistributorMaxMin( this.totalRequestedWork = totalRequestedWork this.totalRequestedSpeed = totalRequestedSpeed - this.totalAllocatedSpeed = capacity - availableSpeed + val totalAllocatedSpeed = capacity - availableSpeed + this.totalAllocatedSpeed = totalAllocatedSpeed val totalAllocatedWork = min( totalRequestedWork, totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration) @@ -262,8 +263,8 @@ public class SimResourceDistributorMaxMin( return Long.MAX_VALUE } - override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { - updateCounters(ctx, work) + override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { + updateCounters(ctx, work, willOvercommit) } override fun onFinish(ctx: SimResourceControllableContext) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt index 17045557..2fe1b00f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt @@ -52,8 +52,9 @@ public interface SimResourceProviderLogic { * * @param ctx The context in which the provider runs. * @param work The amount of work that was requested by the resource consumer. + * @param willOvercommit A flag to indicate that the remaining work is overcommitted. */ - public fun onUpdate(ctx: SimResourceControllableContext, work: Double) {} + public fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {} /** * This method is invoked when the resource consumer has finished. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 2f70e3cc..2d53198a 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -51,8 +51,8 @@ public class SimResourceSource( } } - override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { - updateCounters(ctx, work) + override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { + updateCounters(ctx, work, willOvercommit) } override fun onFinish(ctx: SimResourceControllableContext) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index 98fad068..b79998a3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -202,17 +202,18 @@ internal class SimResourceContextImpl( val isInterrupted = _flag and FLAG_INTERRUPT != 0 val remainingWork = getRemainingWork(timestamp) val isConsume = _limit > 0.0 + val reachedDeadline = _deadline <= timestamp // Update the resource counters only if there is some progress if (timestamp > _timestamp) { - logic.onUpdate(this, _work) + logic.onUpdate(this, _work, reachedDeadline) } // We should only continue processing the next command if: // 1. The resource consumption was finished. // 2. The resource capacity cannot satisfy the demand. // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) - if ((isConsume && remainingWork == 0.0) || _deadline <= timestamp || isInterrupted) { + if ((isConsume && remainingWork == 0.0) || reachedDeadline || isInterrupted) { when (val command = consumer.onNext(this)) { is SimResourceCommand.Idle -> interpretIdle(timestamp, command.deadline) is SimResourceCommand.Consume -> interpretConsume(timestamp, command.work, command.limit, command.deadline) -- cgit v1.2.3 From a9539a3e444c1bd4fb7090dad38f3b568afe092a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 18 Aug 2021 12:18:08 +0200 Subject: fix(simulator): Support trace fragments with zero cores available This change fixes an issue with the simulator where trace fragments with zero cores to execute would give a NaN amount of work. --- .../simulator/compute/workload/SimTraceWorkload.kt | 10 +++++++- .../compute/workload/SimTraceWorkloadTest.kt | 27 ++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index fc49f357..48be8e1a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -89,7 +89,10 @@ public class SimTraceWorkload(public val trace: Sequence, private val return SimResourceCommand.Idle(timestamp) } - val usage = fragment.usage / fragment.cores + val usage = if (fragment.cores > 0) + fragment.usage / fragment.cores + else + 0.0 val deadline = timestamp + fragment.duration val duration = deadline - now val work = duration * usage / 1000 @@ -103,6 +106,11 @@ public class SimTraceWorkload(public val trace: Sequence, private val /** * A fragment of the workload. + * + * @param timestamp The timestamp at which the fragment starts. + * @param duration The duration of the fragment. + * @param usage The CPU usage during the fragment. + * @param cores The amount of cores utilized during the fragment. */ public data class Fragment(val timestamp: Long, val duration: Long, val usage: Double, val cores: Int) } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt index 39c1eb5a..78019c2e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt @@ -130,4 +130,31 @@ class SimTraceWorkloadTest { machine.close() } } + + @Test + fun testZeroCores() = runBlockingSimulation { + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) + + val workload = SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), + SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), + SimTraceWorkload.Fragment(2000, 1000, 0.0, 0), + SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + ), + offset = 0 + ) + + try { + machine.run(workload) + + assertEquals(4000, clock.millis()) + } finally { + machine.close() + } + } } -- cgit v1.2.3 From c46ff4c5cc18ba8a82ee0135f087c4d7aed1e804 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 18 Aug 2021 14:39:03 +0200 Subject: fix(simulator): Support unaligned trace fragments --- .../experiments/capelin/CapelinIntegrationTest.kt | 8 +-- .../resources/SimResourceDistributorMaxMin.kt | 59 ++++++++++++++++++---- .../simulator/resources/SimResourceSwitchMaxMin.kt | 2 +- 3 files changed, 53 insertions(+), 16 deletions(-) diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 3dca1e09..393fb88d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -114,9 +114,9 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(155252275350, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(155086837649, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(155088144, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, + { assertEquals(155252275351, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, + { assertEquals(155086837645, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(725049, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) } @@ -153,7 +153,7 @@ class CapelinIntegrationTest { assertAll( { assertEquals(29454904468, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, { assertEquals(29355293349, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(99627123, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(0, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index 6b420911..a985986d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -22,6 +22,7 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.impl.SimResourceCountersImpl import org.opendc.simulator.resources.interference.InterferenceDomain import org.opendc.simulator.resources.interference.InterferenceKey import kotlin.math.min @@ -53,9 +54,9 @@ public class SimResourceDistributorMaxMin( private val activeOutputs: MutableList = mutableListOf() /** - * The total amount of work requested by the output resources. + * The total amount of work allocated to be executed. */ - private var totalRequestedWork = 0.0 + private var totalAllocatedWork = 0.0 /** * The total allocated speed for the output resources. @@ -67,6 +68,13 @@ public class SimResourceDistributorMaxMin( */ private var totalRequestedSpeed = 0.0 + /** + * The resource counters of this distributor. + */ + public val counters: SimResourceCounters + get() = _counters + private val _counters = SimResourceCountersImpl() + /* SimResourceDistributor */ override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider { val provider = Output(ctx?.capacity ?: 0.0, key) @@ -102,6 +110,25 @@ public class SimResourceDistributorMaxMin( } } + /** + * Update the counters of the distributor. + */ + private fun updateCounters(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { + if (work <= 0.0) { + return + } + + val counters = _counters + val remainingWork = ctx.remainingWork + + counters.demand += work + counters.actual += work - remainingWork + + if (willOvercommit && remainingWork > 0.0) { + counters.overcommit += remainingWork + } + } + /** * Schedule the work of the outputs. */ @@ -116,7 +143,6 @@ public class SimResourceDistributorMaxMin( var deadline: Long = Long.MAX_VALUE var availableSpeed = capacity var totalRequestedSpeed = 0.0 - var totalRequestedWork = 0.0 // Pull in the work of the outputs val outputIterator = activeOutputs.listIterator() @@ -138,6 +164,7 @@ public class SimResourceDistributorMaxMin( for (output in activeOutputs) { val availableShare = availableSpeed / remaining-- val grantedSpeed = min(output.allowedSpeed, availableShare) + deadline = min(deadline, output.deadline) // Ignore idle computation @@ -147,7 +174,6 @@ public class SimResourceDistributorMaxMin( } totalRequestedSpeed += output.limit - totalRequestedWork += output.work output.actualSpeed = grantedSpeed availableSpeed -= grantedSpeed @@ -156,19 +182,28 @@ public class SimResourceDistributorMaxMin( duration = min(duration, output.work / grantedSpeed) } + val targetDuration = min(duration, (deadline - interpreter.clock.millis()) / 1000.0) + var totalRequestedWork = 0.0 + var totalAllocatedWork = 0.0 + for (output in activeOutputs) { + val work = output.work + val speed = output.actualSpeed + if (speed > 0.0) { + val outputDuration = work / speed + totalRequestedWork += work * (duration / outputDuration) + totalAllocatedWork += work * (targetDuration / outputDuration) + } + } + assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" } - this.totalRequestedWork = totalRequestedWork this.totalRequestedSpeed = totalRequestedSpeed + this.totalAllocatedWork = totalAllocatedWork val totalAllocatedSpeed = capacity - availableSpeed this.totalAllocatedSpeed = totalAllocatedSpeed - val totalAllocatedWork = min( - totalRequestedWork, - totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration) - ) return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) - SimResourceCommand.Consume(totalRequestedWork, totalAllocatedSpeed, deadline) + SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) else SimResourceCommand.Idle(deadline) } @@ -265,6 +300,8 @@ public class SimResourceDistributorMaxMin( override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { updateCounters(ctx, work, willOvercommit) + + this@SimResourceDistributorMaxMin.updateCounters(ctx, work, willOvercommit) } override fun onFinish(ctx: SimResourceControllableContext) { @@ -289,7 +326,7 @@ public class SimResourceDistributorMaxMin( } // Compute the work that was actually granted to the output. - return (totalRequestedWork - totalRemainingWork) * fraction * perfScore + return (totalAllocatedWork - totalRemainingWork) * fraction * perfScore } /* Comparable */ diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index ceb5a1a4..d988b70d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -54,7 +54,7 @@ public class SimResourceSwitchMaxMin( * The resource counters to track the execution metrics of all switch resources. */ override val counters: SimResourceCounters - get() = aggregator.counters + get() = distributor.counters /** * A flag to indicate that the switch was closed. -- cgit v1.2.3