diff options
Diffstat (limited to 'opendc-simulator')
14 files changed, 310 insertions, 91 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index 8f60bf05..30797089 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -70,14 +70,14 @@ class SimMachineBenchmarks { @Setup fun setUp() { trace = sequenceOf( - SimTraceWorkload.Fragment(1000, 28.0, 1), - SimTraceWorkload.Fragment(1000, 3500.0, 1), - SimTraceWorkload.Fragment(1000, 0.0, 1), - SimTraceWorkload.Fragment(1000, 183.0, 1), - SimTraceWorkload.Fragment(1000, 400.0, 1), - SimTraceWorkload.Fragment(1000, 100.0, 1), - SimTraceWorkload.Fragment(1000, 3000.0, 1), - SimTraceWorkload.Fragment(1000, 4500.0, 1), + SimTraceWorkload.Fragment(0, 1000, 28.0, 1), + SimTraceWorkload.Fragment(1000, 1000, 3500.0, 1), + SimTraceWorkload.Fragment(2000, 1000, 0.0, 1), + SimTraceWorkload.Fragment(3000, 1000, 183.0, 1), + SimTraceWorkload.Fragment(4000, 1000, 400.0, 1), + SimTraceWorkload.Fragment(5000, 1000, 100.0, 1), + SimTraceWorkload.Fragment(6000, 1000, 3000.0, 1), + SimTraceWorkload.Fragment(7000, 1000, 4500.0, 1), ) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 139c66e0..f416643e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -116,7 +116,6 @@ public abstract class SimAbstractMachine( // Cancel all cpus on cancellation cont.invokeOnCancellation { this.cont = null - interpreter.batch { for (cpu in cpus) { cpu.cancel() diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 622bcd4d..48be8e1a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -27,25 +27,19 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext -import org.opendc.simulator.resources.consumer.SimConsumerBarrier /** * A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource * consumption for some period of time. + * + * @param trace The trace of fragments to use. + * @param offset The offset for the timestamps. */ -public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkload { - private var offset = Long.MIN_VALUE +public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val offset: Long = 0L) : SimWorkload { private val iterator = trace.iterator() private var fragment: Fragment? = null - private lateinit var barrier: SimConsumerBarrier override fun onStart(ctx: SimMachineContext) { - check(offset == Long.MIN_VALUE) { "Workload does not support re-use" } - - barrier = SimConsumerBarrier(ctx.cpus.size) - fragment = nextFragment() - offset = ctx.interpreter.clock.millis() - val lifecycle = SimWorkloadLifecycle(ctx) for (cpu in ctx.cpus) { @@ -56,43 +50,67 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa override fun toString(): String = "SimTraceWorkload" /** - * Obtain the next fragment. + * Obtain the fragment with a timestamp equal or greater than [now]. */ - private fun nextFragment(): Fragment? { - return if (iterator.hasNext()) { - iterator.next() - } else { - null + private fun pullFragment(now: Long): Fragment? { + var fragment = fragment + if (fragment != null && !fragment.isExpired(now)) { + return fragment } + + while (iterator.hasNext()) { + fragment = iterator.next() + if (!fragment.isExpired(now)) { + this.fragment = fragment + return fragment + } + } + + this.fragment = null + return null + } + + /** + * Determine if the specified [Fragment] is expired, i.e., it has already passed. + */ + private fun Fragment.isExpired(now: Long): Boolean { + val timestamp = this.timestamp + offset + return now >= timestamp + duration } private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer { override fun onNext(ctx: SimResourceContext): SimResourceCommand { val now = ctx.clock.millis() - val fragment = fragment ?: return SimResourceCommand.Exit - val usage = fragment.usage / fragment.cores - val work = (fragment.duration / 1000) * usage - val deadline = offset + fragment.duration + val fragment = pullFragment(now) ?: return SimResourceCommand.Exit + val timestamp = fragment.timestamp + offset - assert(deadline >= now) { "Deadline already passed" } - - val cmd = - if (cpu.id < fragment.cores && work > 0.0) - SimResourceCommand.Consume(work, usage, deadline) - else - SimResourceCommand.Idle(deadline) - - if (barrier.enter()) { - this@SimTraceWorkload.fragment = nextFragment() - this@SimTraceWorkload.offset += fragment.duration + // Fragment is in the future + if (timestamp > now) { + return SimResourceCommand.Idle(timestamp) } - return cmd + val usage = if (fragment.cores > 0) + fragment.usage / fragment.cores + else + 0.0 + val deadline = timestamp + fragment.duration + val duration = deadline - now + val work = duration * usage / 1000 + + return if (cpu.id < fragment.cores && work > 0.0) + SimResourceCommand.Consume(work, usage, deadline) + else + SimResourceCommand.Idle(deadline) } } /** * A fragment of the workload. + * + * @param timestamp The timestamp at which the fragment starts. + * @param duration The duration of the fragment. + * @param usage The CPU usage during the fragment. + * @param cores The amount of cores utilized during the fragment. */ - public data class Fragment(val duration: Long, val usage: Double, val cores: Int) + public data class Fragment(val timestamp: Long, val duration: Long, val usage: Double, val cores: Int) } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index a6d955ca..19808a77 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -44,7 +44,6 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer /** * Test suite for the [SimBareMetalMachine] class. */ -@OptIn(ExperimentalCoroutinesApi::class) class SimMachineTest { private lateinit var machineModel: MachineModel diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt index a61cba8d..afc4c949 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt @@ -91,10 +91,10 @@ internal class SimHypervisorTest { val workloadA = SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) ), ) @@ -154,19 +154,19 @@ internal class SimHypervisorTest { val workloadA = SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) ), ) val workloadB = SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 73.0, 1) + SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3100.0, 1), + SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1) ) ) @@ -251,19 +251,19 @@ internal class SimHypervisorTest { val workloadA = SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) ), ) val workloadB = SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 73.0, 1) + SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3100.0, 1), + SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1) ) ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 7c77b283..80496992 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -71,10 +71,10 @@ internal class SimSpaceSharedHypervisorTest { val workloadA = SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) ), ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt new file mode 100644 index 00000000..78019c2e --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +import kotlinx.coroutines.delay +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.model.* +import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceInterpreter + +/** + * Test suite for the [SimTraceWorkloadTest] class. + */ +class SimTraceWorkloadTest { + private lateinit var machineModel: MachineModel + + @BeforeEach + fun setUp() { + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + + machineModel = MachineModel( + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + } + + @Test + fun testSmoke() = runBlockingSimulation { + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) + + val workload = SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), + SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), + SimTraceWorkload.Fragment(2000, 1000, 0.0, 2), + SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + ), + offset = 0 + ) + + try { + machine.run(workload) + + assertEquals(4000, clock.millis()) + } finally { + machine.close() + } + } + + @Test + fun testOffset() = runBlockingSimulation { + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) + + val workload = SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), + SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), + SimTraceWorkload.Fragment(2000, 1000, 0.0, 2), + SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + ), + offset = 1000 + ) + + try { + machine.run(workload) + + assertEquals(5000, clock.millis()) + } finally { + machine.close() + } + } + + @Test + fun testSkipFragment() = runBlockingSimulation { + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) + + val workload = SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), + SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), + SimTraceWorkload.Fragment(2000, 1000, 0.0, 2), + SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + ), + offset = 0 + ) + + try { + delay(1000L) + machine.run(workload) + + assertEquals(4000, clock.millis()) + } finally { + machine.close() + } + } + + @Test + fun testZeroCores() = runBlockingSimulation { + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) + + val workload = SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), + SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), + SimTraceWorkload.Fragment(2000, 1000, 0.0, 0), + SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + ), + offset = 0 + ) + + try { + machine.run(workload) + + assertEquals(4000, clock.millis()) + } finally { + machine.close() + } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 8a24b3e7..00648876 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -112,8 +112,8 @@ public abstract class SimAbstractResourceAggregator( doFinish() } - override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { - updateCounters(ctx, work) + override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { + updateCounters(ctx, work, willOvercommit) } override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt index 860c50ee..4e8e803a 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt @@ -87,7 +87,7 @@ public abstract class SimAbstractResourceProvider( /** * Update the counters of the resource provider. */ - protected fun updateCounters(ctx: SimResourceContext, work: Double) { + protected fun updateCounters(ctx: SimResourceContext, work: Double, willOvercommit: Boolean) { if (work <= 0.0) { return } @@ -96,7 +96,10 @@ public abstract class SimAbstractResourceProvider( val remainingWork = ctx.remainingWork counters.demand += work counters.actual += work - remainingWork - counters.overcommit += remainingWork + + if (willOvercommit && remainingWork > 0.0) { + counters.overcommit += remainingWork + } } final override fun startConsumer(consumer: SimResourceConsumer) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index 398797cf..a985986d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -22,6 +22,7 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.impl.SimResourceCountersImpl import org.opendc.simulator.resources.interference.InterferenceDomain import org.opendc.simulator.resources.interference.InterferenceKey import kotlin.math.min @@ -53,9 +54,9 @@ public class SimResourceDistributorMaxMin( private val activeOutputs: MutableList<Output> = mutableListOf() /** - * The total amount of work requested by the output resources. + * The total amount of work allocated to be executed. */ - private var totalRequestedWork = 0.0 + private var totalAllocatedWork = 0.0 /** * The total allocated speed for the output resources. @@ -67,6 +68,13 @@ public class SimResourceDistributorMaxMin( */ private var totalRequestedSpeed = 0.0 + /** + * The resource counters of this distributor. + */ + public val counters: SimResourceCounters + get() = _counters + private val _counters = SimResourceCountersImpl() + /* SimResourceDistributor */ override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider { val provider = Output(ctx?.capacity ?: 0.0, key) @@ -103,6 +111,25 @@ public class SimResourceDistributorMaxMin( } /** + * Update the counters of the distributor. + */ + private fun updateCounters(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { + if (work <= 0.0) { + return + } + + val counters = _counters + val remainingWork = ctx.remainingWork + + counters.demand += work + counters.actual += work - remainingWork + + if (willOvercommit && remainingWork > 0.0) { + counters.overcommit += remainingWork + } + } + + /** * Schedule the work of the outputs. */ private fun doNext(ctx: SimResourceContext): SimResourceCommand { @@ -116,7 +143,6 @@ public class SimResourceDistributorMaxMin( var deadline: Long = Long.MAX_VALUE var availableSpeed = capacity var totalRequestedSpeed = 0.0 - var totalRequestedWork = 0.0 // Pull in the work of the outputs val outputIterator = activeOutputs.listIterator() @@ -138,6 +164,7 @@ public class SimResourceDistributorMaxMin( for (output in activeOutputs) { val availableShare = availableSpeed / remaining-- val grantedSpeed = min(output.allowedSpeed, availableShare) + deadline = min(deadline, output.deadline) // Ignore idle computation @@ -147,7 +174,6 @@ public class SimResourceDistributorMaxMin( } totalRequestedSpeed += output.limit - totalRequestedWork += output.work output.actualSpeed = grantedSpeed availableSpeed -= grantedSpeed @@ -156,18 +182,28 @@ public class SimResourceDistributorMaxMin( duration = min(duration, output.work / grantedSpeed) } + val targetDuration = min(duration, (deadline - interpreter.clock.millis()) / 1000.0) + var totalRequestedWork = 0.0 + var totalAllocatedWork = 0.0 + for (output in activeOutputs) { + val work = output.work + val speed = output.actualSpeed + if (speed > 0.0) { + val outputDuration = work / speed + totalRequestedWork += work * (duration / outputDuration) + totalAllocatedWork += work * (targetDuration / outputDuration) + } + } + assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" } - this.totalRequestedWork = totalRequestedWork this.totalRequestedSpeed = totalRequestedSpeed - this.totalAllocatedSpeed = capacity - availableSpeed - val totalAllocatedWork = min( - totalRequestedWork, - totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration) - ) + this.totalAllocatedWork = totalAllocatedWork + val totalAllocatedSpeed = capacity - availableSpeed + this.totalAllocatedSpeed = totalAllocatedSpeed return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) - SimResourceCommand.Consume(totalRequestedWork, totalAllocatedSpeed, deadline) + SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) else SimResourceCommand.Idle(deadline) } @@ -262,8 +298,10 @@ public class SimResourceDistributorMaxMin( return Long.MAX_VALUE } - override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { - updateCounters(ctx, work) + override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { + updateCounters(ctx, work, willOvercommit) + + this@SimResourceDistributorMaxMin.updateCounters(ctx, work, willOvercommit) } override fun onFinish(ctx: SimResourceControllableContext) { @@ -288,7 +326,7 @@ public class SimResourceDistributorMaxMin( } // Compute the work that was actually granted to the output. - return (totalRequestedWork - totalRemainingWork) * fraction * perfScore + return (totalAllocatedWork - totalRemainingWork) * fraction * perfScore } /* Comparable */ diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt index 17045557..2fe1b00f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt @@ -52,8 +52,9 @@ public interface SimResourceProviderLogic { * * @param ctx The context in which the provider runs. * @param work The amount of work that was requested by the resource consumer. + * @param willOvercommit A flag to indicate that the remaining work is overcommitted. */ - public fun onUpdate(ctx: SimResourceControllableContext, work: Double) {} + public fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {} /** * This method is invoked when the resource consumer has finished. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 2f70e3cc..2d53198a 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -51,8 +51,8 @@ public class SimResourceSource( } } - override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { - updateCounters(ctx, work) + override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { + updateCounters(ctx, work, willOvercommit) } override fun onFinish(ctx: SimResourceControllableContext) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index ceb5a1a4..d988b70d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -54,7 +54,7 @@ public class SimResourceSwitchMaxMin( * The resource counters to track the execution metrics of all switch resources. */ override val counters: SimResourceCounters - get() = aggregator.counters + get() = distributor.counters /** * A flag to indicate that the switch was closed. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index 98fad068..b79998a3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -202,17 +202,18 @@ internal class SimResourceContextImpl( val isInterrupted = _flag and FLAG_INTERRUPT != 0 val remainingWork = getRemainingWork(timestamp) val isConsume = _limit > 0.0 + val reachedDeadline = _deadline <= timestamp // Update the resource counters only if there is some progress if (timestamp > _timestamp) { - logic.onUpdate(this, _work) + logic.onUpdate(this, _work, reachedDeadline) } // We should only continue processing the next command if: // 1. The resource consumption was finished. // 2. The resource capacity cannot satisfy the demand. // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) - if ((isConsume && remainingWork == 0.0) || _deadline <= timestamp || isInterrupted) { + if ((isConsume && remainingWork == 0.0) || reachedDeadline || isInterrupted) { when (val command = consumer.onNext(this)) { is SimResourceCommand.Idle -> interpretIdle(timestamp, command.deadline) is SimResourceCommand.Consume -> interpretConsume(timestamp, command.work, command.limit, command.deadline) |
