diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-26 13:11:10 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-03 17:17:37 +0200 |
| commit | d575bed5418be222e1d3ad39af862e2390596d61 (patch) | |
| tree | e00656f774a62543a032284f5ef00da479b293d6 | |
| parent | a4a611c45dfd5f9e379434f1dc459128cb437338 (diff) | |
refactor(simulator): Combine work and deadline to duration
This change removes the work and deadline properties from the
SimResourceCommand.Consume class and introduces a new property duration.
This property is now used in conjunction with the limit to compute the amount
of work processed by a resource provider.
Previously, we used both work and deadline to compute the duration and
the amount of remaining work at the end of a consumption. However, with
this change, we ensure that a resource consumption always runs at the
same speed once establishing, drastically simplifying the computation
for the amount of work processed during the consumption.
31 files changed, 243 insertions, 480 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 30cc1466..ffc50ad8 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -118,7 +118,7 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, { assertEquals(223331032, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, { assertEquals(67006568, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, - { assertEquals(3159379, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, + { assertEquals(3088047, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, { assertEquals(5.841120890240688E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, ) @@ -211,8 +211,8 @@ class CapelinIntegrationTest { assertAll( { assertEquals(6013899, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(14724501, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(473394, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(12027839, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(477664, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index 0873aac9..bfc5fc6f 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -41,6 +41,7 @@ import java.util.* import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume +import kotlin.math.roundToLong /** * A [TFDevice] implementation using simulated components. @@ -127,13 +128,16 @@ public class SimTFDevice( } } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + val consumedWork = ctx.speed * delta / 1000.0 + val activeWork = activeWork if (activeWork != null) { - if (activeWork.consume(activeWork.flops - ctx.remainingWork)) { + if (activeWork.consume(consumedWork)) { this.activeWork = null } else { - return SimResourceCommand.Consume(activeWork.flops, ctx.capacity) + val duration = (activeWork.flops / ctx.capacity * 1000).roundToLong() + return SimResourceCommand.Consume(ctx.capacity, duration) } } @@ -141,9 +145,10 @@ public class SimTFDevice( val head = queue.poll() return if (head != null) { this.activeWork = head - SimResourceCommand.Consume(head.flops, ctx.capacity) + val duration = (head.flops / ctx.capacity * 1000).roundToLong() + SimResourceCommand.Consume(ctx.capacity, duration) } else { - SimResourceCommand.Idle() + SimResourceCommand.Consume(0.0) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt index 0a7dc40f..34ac4418 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt @@ -83,13 +83,13 @@ public class SimPsu( } override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0) return if (powerDraw > 0.0) - SimResourceCommand.Consume(Double.POSITIVE_INFINITY, powerDraw, Long.MAX_VALUE) + SimResourceCommand.Consume(powerDraw, Long.MAX_VALUE) else - SimResourceCommand.Idle() + SimResourceCommand.Consume(0.0) } override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 5a4c4f44..527619bd 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -80,14 +80,13 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val } private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val now = ctx.clock.millis() + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { val fragment = pullFragment(now) ?: return SimResourceCommand.Exit val timestamp = fragment.timestamp + offset // Fragment is in the future if (timestamp > now) { - return SimResourceCommand.Idle(timestamp) + return SimResourceCommand.Consume(0.0, timestamp - now) } val cores = min(cpu.node.coreCount, fragment.cores) @@ -97,12 +96,11 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val 0.0 val deadline = timestamp + fragment.duration val duration = deadline - now - val work = duration * usage / 1000 - return if (cpu.id < cores && work > 0.0) - SimResourceCommand.Consume(work, usage, deadline) + return if (cpu.id < cores && usage > 0.0) + SimResourceCommand.Consume(usage, duration) else - SimResourceCommand.Idle(deadline) + SimResourceCommand.Consume(0.0, duration) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 3d3feb2a..55d6d7c4 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -155,6 +155,8 @@ internal class SimSpaceSharedHypervisorTest { vm.run(SimRuntimeWorkload(duration)) vm.close() + yield() + val vm2 = hypervisor.createMachine(machineModel) vm2.run(SimRuntimeWorkload(duration)) vm2.close() diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt index 5efdbed9..f2dd8455 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt @@ -32,7 +32,7 @@ public class SimNetworkSink( public val capacity: Double ) : SimNetworkPort() { override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Idle() + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand = SimResourceCommand.Consume(0.0) override fun toString(): String = "SimNetworkSink.Consumer" } diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt index 3ce85d02..e8839496 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt @@ -47,21 +47,13 @@ public class SimPdu( public fun newOutlet(): Outlet = Outlet(distributor.newOutput()) override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer by distributor { - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return when (val cmd = distributor.onNext(ctx)) { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + return when (val cmd = distributor.onNext(ctx, now, delta)) { is SimResourceCommand.Consume -> { - val duration = cmd.work / cmd.limit val loss = computePowerLoss(cmd.limit) val newLimit = cmd.limit + loss - SimResourceCommand.Consume(duration * newLimit, newLimit, cmd.deadline) - } - is SimResourceCommand.Idle -> { - val loss = computePowerLoss(0.0) - if (loss > 0.0) - SimResourceCommand.Consume(Double.POSITIVE_INFINITY, loss, cmd.deadline) - else - cmd + SimResourceCommand.Consume(newLimit, cmd.duration) } else -> cmd } diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt index f9431d21..4c2beb68 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt @@ -55,21 +55,13 @@ public class SimUps( override fun onConnect(inlet: SimPowerInlet) { val consumer = inlet.createConsumer() aggregator.startConsumer(object : SimResourceConsumer by consumer { - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return when (val cmd = consumer.onNext(ctx)) { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + return when (val cmd = consumer.onNext(ctx, now, delta)) { is SimResourceCommand.Consume -> { - val duration = cmd.work / cmd.limit val loss = computePowerLoss(cmd.limit) val newLimit = cmd.limit + loss - SimResourceCommand.Consume(duration * newLimit, newLimit, cmd.deadline) - } - is SimResourceCommand.Idle -> { - val loss = computePowerLoss(0.0) - if (loss > 0.0) - SimResourceCommand.Consume(Double.POSITIVE_INFINITY, loss, cmd.deadline) - else - cmd + SimResourceCommand.Consume(newLimit, cmd.duration) } else -> cmd } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 00648876..da5c3257 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -32,12 +32,7 @@ public abstract class SimAbstractResourceAggregator( /** * This method is invoked when the resource consumer consumes resources. */ - protected abstract fun doConsume(work: Double, limit: Double, deadline: Long) - - /** - * This method is invoked when the resource consumer enters an idle state. - */ - protected abstract fun doIdle(deadline: Long) + protected abstract fun doConsume(limit: Double, duration: Long) /** * This method is invoked when the resource consumer finishes processing. @@ -98,26 +93,23 @@ public abstract class SimAbstractResourceAggregator( private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) { override fun createLogic(): SimResourceProviderLogic { return object : SimResourceProviderLogic { - override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { - doIdle(deadline) - return Long.MAX_VALUE - } - override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { - doConsume(work, limit, deadline) - return Long.MAX_VALUE + override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long { + doConsume(limit, duration) + return super.onConsume(ctx, now, limit, duration) } override fun onFinish(ctx: SimResourceControllableContext) { doFinish() } - override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { - updateCounters(ctx, work, willOvercommit) - } - - override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { - return work - _inputConsumers.sumOf { it.remainingWork } + override fun onUpdate( + ctx: SimResourceControllableContext, + delta: Long, + limit: Double, + willOvercommit: Boolean + ) { + updateCounters(ctx, delta, limit, willOvercommit) } } } @@ -157,12 +149,6 @@ public abstract class SimAbstractResourceAggregator( private var _ctx: SimResourceContext? = null /** - * The remaining work of the consumer. - */ - val remainingWork: Double - get() = _ctx?.remainingWork ?: 0.0 - - /** * The resource command to run next. */ private var command: SimResourceCommand? = null @@ -179,7 +165,7 @@ public abstract class SimAbstractResourceAggregator( } /* SimResourceConsumer */ - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { var next = command return if (next != null) { @@ -190,7 +176,7 @@ public abstract class SimAbstractResourceAggregator( next = command this.command = null - next ?: SimResourceCommand.Idle() + next ?: SimResourceCommand.Consume(0.0) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt index 4e8e803a..548bc228 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt @@ -87,21 +87,35 @@ public abstract class SimAbstractResourceProvider( /** * Update the counters of the resource provider. */ - protected fun updateCounters(ctx: SimResourceContext, work: Double, willOvercommit: Boolean) { - if (work <= 0.0) { + protected fun updateCounters(ctx: SimResourceContext, delta: Long, limit: Double, willOvercommit: Boolean) { + if (delta <= 0.0) { return } val counters = _counters - val remainingWork = ctx.remainingWork + val deltaS = delta / 1000.0 + val work = limit * deltaS + val actualWork = ctx.speed * deltaS + val remainingWork = work - actualWork + counters.demand += work - counters.actual += work - remainingWork + counters.actual += actualWork if (willOvercommit && remainingWork > 0.0) { counters.overcommit += remainingWork } } + /** + * Update the counters of the resource provider. + */ + protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) { + val counters = _counters + counters.demand += demand + counters.actual += actual + counters.overcommit += overcommit + } + final override fun startConsumer(consumer: SimResourceConsumer) { check(ctx == null) { "Resource is in invalid state" } val ctx = interpreter.newContext(consumer, createLogic(), parent) diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index 991cda7a..537be1b5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -31,7 +31,7 @@ public class SimResourceAggregatorMaxMin( ) : SimAbstractResourceAggregator(interpreter, parent) { private val consumers = mutableListOf<Input>() - override fun doConsume(work: Double, limit: Double, deadline: Long) { + override fun doConsume(limit: Double, duration: Long) { // Sort all consumers by their capacity consumers.sortWith(compareBy { it.ctx.capacity }) @@ -40,22 +40,15 @@ public class SimResourceAggregatorMaxMin( val inputCapacity = input.ctx.capacity val fraction = inputCapacity / capacity val grantedSpeed = limit * fraction - val grantedWork = fraction * work - val command = if (grantedWork > 0.0 && grantedSpeed > 0.0) - SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) + val command = if (grantedSpeed > 0.0) + SimResourceCommand.Consume(grantedSpeed, duration) else - SimResourceCommand.Idle() + SimResourceCommand.Consume(0.0, duration) input.push(command) } } - override fun doIdle(deadline: Long) { - for (input in consumers) { - input.push(SimResourceCommand.Idle(deadline)) - } - } - override fun doFinish() { val iterator = consumers.iterator() for (input in iterator) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt index f7f3fa4d..4a980071 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt @@ -27,25 +27,19 @@ package org.opendc.simulator.resources */ public sealed class SimResourceCommand { /** - * A request to the resource to perform the specified amount of work before the given [deadline]. + * A request to the resource to perform work for the specified [duration]. * - * @param work The amount of work to process. * @param limit The maximum amount of work to be processed per second. - * @param deadline The instant at which the work needs to be fulfilled. + * @param duration The duration of the resource consumption in milliseconds. */ - public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() { + public data class Consume(val limit: Double, val duration: Long = Long.MAX_VALUE) : SimResourceCommand() { init { - require(work > 0) { "Amount of work must be positive" } - require(limit > 0) { "Limit must be positive" } + require(limit >= 0.0) { "Negative limit is not allowed" } + require(duration >= 0) { "Duration must be positive" } } } /** - * An indication to the resource that the consumer will idle until the specified [deadline] or if it is interrupted. - */ - public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() - - /** * An indication to the resource that the consumer has finished. */ public object Exit : SimResourceCommand() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 4d937514..4d1d2c32 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -34,9 +34,11 @@ public interface SimResourceConsumer { * the resource finished processing, reached its deadline or was interrupted. * * @param ctx The execution context in which the consumer runs. + * @param now The virtual timestamp in milliseconds at which the update is occurring. + * @param delta The virtual duration between this call and the last call in milliseconds. * @return The next command that the resource should execute. */ - public fun onNext(ctx: SimResourceContext): SimResourceCommand + public fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand /** * This method is invoked when an event has occurred. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index 0d9a6106..f28b43d0 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -50,11 +50,6 @@ public interface SimResourceContext { public val demand: Double /** - * The amount of work still remaining at this instant. - */ - public val remainingWork: Double - - /** * Ask the resource provider to interrupt its resource. */ public fun interrupt() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index 63cfbdac..d23c7dbb 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -97,8 +97,8 @@ public class SimResourceDistributorMaxMin( } /* SimResourceConsumer */ - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return doNext(ctx) + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + return doNext(ctx, now) } override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { @@ -135,38 +135,16 @@ public class SimResourceDistributorMaxMin( } /** - * Update the counters of the distributor. - */ - private fun updateCounters(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { - if (work <= 0.0) { - return - } - - val counters = _counters - val remainingWork = ctx.remainingWork - - counters.demand += work - counters.actual += work - remainingWork - - if (willOvercommit && remainingWork > 0.0) { - counters.overcommit += remainingWork - } - } - - /** * Schedule the work of the outputs. */ - private fun doNext(ctx: SimResourceContext): SimResourceCommand { + private fun doNext(ctx: SimResourceContext, now: Long): SimResourceCommand { // If there is no work yet, mark the input as idle. if (activeOutputs.isEmpty()) { - return SimResourceCommand.Idle() + return SimResourceCommand.Consume(0.0) } - val now = interpreter.clock.millis() - val capacity = ctx.capacity - var duration: Double = Double.MAX_VALUE - var deadline: Long = Long.MAX_VALUE + var duration: Long = Long.MAX_VALUE var availableSpeed = capacity var totalRequestedSpeed = 0.0 @@ -191,10 +169,10 @@ public class SimResourceDistributorMaxMin( val availableShare = availableSpeed / remaining-- val grantedSpeed = min(output.allowedSpeed, availableShare) - deadline = min(deadline, output.deadline) + duration = min(duration, output.duration) // Ignore idle computation - if (grantedSpeed <= 0.0 || output.work <= 0.0) { + if (grantedSpeed <= 0.0) { output.actualSpeed = 0.0 continue } @@ -203,35 +181,29 @@ public class SimResourceDistributorMaxMin( output.actualSpeed = grantedSpeed availableSpeed -= grantedSpeed - - // The duration that we want to run is that of the shortest request of an output - duration = min(duration, output.work / grantedSpeed) } - val targetDuration = min(duration, (deadline - now) / 1000.0) + val durationS = duration / 1000.0 var totalRequestedWork = 0.0 var totalAllocatedWork = 0.0 for (output in activeOutputs) { - val work = output.work + val limit = output.limit val speed = output.actualSpeed if (speed > 0.0) { - val outputDuration = work / speed - totalRequestedWork += work * (duration / outputDuration) - totalAllocatedWork += work * (targetDuration / outputDuration) + totalRequestedWork += limit * durationS + totalAllocatedWork += speed * durationS } } - assert(deadline >= now) { "Deadline already passed" } - this.totalRequestedSpeed = totalRequestedSpeed this.totalAllocatedWork = totalAllocatedWork val totalAllocatedSpeed = capacity - availableSpeed this.totalAllocatedSpeed = totalAllocatedSpeed return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) - SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) + SimResourceCommand.Consume(totalAllocatedSpeed, duration) else - SimResourceCommand.Idle(deadline) + SimResourceCommand.Consume(0.0, duration) } private fun updateCapacity(ctx: SimResourceContext) { @@ -243,7 +215,7 @@ public class SimResourceDistributorMaxMin( /** * An internal [SimResourceProvider] implementation for switch outputs. */ - private inner class Output(capacity: Double, private val key: InterferenceKey?) : + private inner class Output(capacity: Double, val key: InterferenceKey?) : SimAbstractResourceProvider(interpreter, parent, capacity), SimResourceCloseableProvider, SimResourceProviderLogic, @@ -254,11 +226,6 @@ public class SimResourceDistributorMaxMin( private var isClosed: Boolean = false /** - * The current requested work. - */ - @JvmField var work: Double = 0.0 - - /** * The requested limit. */ @JvmField var limit: Double = 0.0 @@ -266,7 +233,7 @@ public class SimResourceDistributorMaxMin( /** * The current deadline. */ - @JvmField var deadline: Long = Long.MAX_VALUE + @JvmField var duration: Long = Long.MAX_VALUE /** * The processing speed that is allowed by the model constraints. @@ -304,44 +271,19 @@ public class SimResourceDistributorMaxMin( } /* SimResourceProviderLogic */ - override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { - allowedSpeed = 0.0 - this.deadline = deadline - work = 0.0 - limit = 0.0 - lastCommandTimestamp = ctx.clock.millis() - - return Long.MAX_VALUE - } - - override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { + override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long { allowedSpeed = min(ctx.capacity, limit) - this.work = work this.limit = limit - this.deadline = deadline - lastCommandTimestamp = ctx.clock.millis() - - return Long.MAX_VALUE - } - - override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { - updateCounters(ctx, work, willOvercommit) + this.duration = duration + lastCommandTimestamp = now - this@SimResourceDistributorMaxMin.updateCounters(ctx, work, willOvercommit) - } - - override fun onFinish(ctx: SimResourceControllableContext) { - work = 0.0 - limit = 0.0 - deadline = Long.MAX_VALUE - lastCommandTimestamp = ctx.clock.millis() + return super.onConsume(ctx, now, limit, duration) } - override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { - val totalRemainingWork = this@SimResourceDistributorMaxMin.ctx?.remainingWork ?: 0.0 - - // Compute the fraction of compute time allocated to the output - val fraction = actualSpeed / totalAllocatedSpeed + override fun onUpdate(ctx: SimResourceControllableContext, delta: Long, limit: Double, willOvercommit: Boolean) { + if (delta <= 0.0) { + return + } // Compute the performance penalty due to resource interference val perfScore = if (interferenceDomain != null) { @@ -351,12 +293,29 @@ public class SimResourceDistributorMaxMin( 1.0 } - // Compute the work that was actually granted to the output. - val potentialConsumedWork = (totalAllocatedWork - totalRemainingWork) * fraction + val deltaS = delta / 1000.0 + val work = limit * deltaS + val actualWork = actualSpeed * deltaS + val remainingWork = work - actualWork + val overcommit = if (willOvercommit && remainingWork > 0.0) { + remainingWork + } else { + 0.0 + } - _counters.interference += potentialConsumedWork * max(0.0, 1 - perfScore) + updateCounters(work, actualWork, overcommit) - return potentialConsumedWork + val distCounters = _counters + distCounters.demand += work + distCounters.actual += actualWork + distCounters.overcommit += overcommit + distCounters.interference += actualWork * max(0.0, 1 - perfScore) + } + + override fun onFinish(ctx: SimResourceControllableContext) { + limit = 0.0 + duration = Long.MAX_VALUE + lastCommandTimestamp = ctx.clock.millis() } /* Comparable */ diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt index 2fe1b00f..d8ff87f9 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt @@ -27,53 +27,33 @@ package org.opendc.simulator.resources */ public interface SimResourceProviderLogic { /** - * This method is invoked when the resource is reported to idle until the specified [deadline]. + * This method is invoked when the consumer ask to consume the resource for the specified [duration]. * * @param ctx The context in which the provider runs. - * @param deadline The deadline that was requested by the resource consumer. - * @return The instant at which to resume the consumer. - */ - public fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long - - /** - * This method is invoked when the resource will be consumed until the specified amount of [work] was processed - * or [deadline] is reached. - * - * @param ctx The context in which the provider runs. - * @param work The amount of work that was requested by the resource consumer. + * @param now The virtual timestamp in milliseconds at which the update is occurring. * @param limit The limit on the work rate of the resource consumer. - * @param deadline The deadline that was requested by the resource consumer. - * @return The instant at which to resume the consumer. + * @param duration The duration of the consumption in milliseconds. + * @return The deadline of the resource consumption. */ - public fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long + public fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long { + return if (duration == Long.MAX_VALUE) { + return Long.MAX_VALUE + } else { + now + duration + } + } /** * This method is invoked when the progress of the resource consumer is materialized. * * @param ctx The context in which the provider runs. - * @param work The amount of work that was requested by the resource consumer. + * @param limit The limit on the work rate of the resource consumer. * @param willOvercommit A flag to indicate that the remaining work is overcommitted. */ - public fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {} + public fun onUpdate(ctx: SimResourceControllableContext, delta: Long, limit: Double, willOvercommit: Boolean) {} /** * This method is invoked when the resource consumer has finished. */ - public fun onFinish(ctx: SimResourceControllableContext) - - /** - * Compute the amount of work that was consumed over the specified [duration]. - * - * @param work The total size of the resource consumption. - * @param speed The speed of the resource provider. - * @param duration The duration from the start of the consumption until now. - * @return The amount of work that was consumed by the resource provider. - */ - public fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { - return if (duration > 0L) { - return (duration / 1000.0) * speed - } else { - work - } - } + public fun onFinish(ctx: SimResourceControllableContext) {} } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 2d53198a..10213f26 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -22,9 +22,6 @@ package org.opendc.simulator.resources -import kotlin.math.ceil -import kotlin.math.min - /** * A [SimResourceSource] represents a source for some resource that provides bounded processing capacity. * @@ -39,20 +36,13 @@ public class SimResourceSource( ) : SimAbstractResourceProvider(interpreter, parent, initialCapacity) { override fun createLogic(): SimResourceProviderLogic { return object : SimResourceProviderLogic { - override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { - return deadline - } - - override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { - return if (work.isInfinite()) { - Long.MAX_VALUE - } else { - min(deadline, ctx.clock.millis() + getDuration(work, speed)) - } - } - - override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { - updateCounters(ctx, work, willOvercommit) + override fun onUpdate( + ctx: SimResourceControllableContext, + delta: Long, + limit: Double, + willOvercommit: Boolean + ) { + updateCounters(ctx, delta, limit, willOvercommit) } override fun onFinish(ctx: SimResourceControllableContext) { @@ -62,11 +52,4 @@ public class SimResourceSource( } override fun toString(): String = "SimResourceSource[capacity=$capacity]" - - /** - * Compute the duration that a resource consumption will take with the specified [speed]. - */ - private fun getDuration(work: Double, speed: Double): Long { - return ceil(work / speed * 1000).toLong() - } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index cec27e1c..68bedbd9 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -100,19 +100,19 @@ public class SimResourceTransformer( } } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { val delegate = delegate if (!hasDelegateStarted) { start() } - updateCounters(ctx) + updateCounters(ctx, delta) return if (delegate != null) { - val command = transform(ctx, delegate.onNext(ctx)) + val command = transform(ctx, delegate.onNext(ctx, now, delta)) - _work = if (command is SimResourceCommand.Consume) command.work else 0.0 + _limit = if (command is SimResourceCommand.Consume) command.limit else 0.0 if (command == SimResourceCommand.Exit) { // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we @@ -124,12 +124,12 @@ public class SimResourceTransformer( if (isCoupled) SimResourceCommand.Exit else - onNext(ctx) + onNext(ctx, now, delta) } else { command } } else { - SimResourceCommand.Idle() + SimResourceCommand.Consume(0.0) } } @@ -180,19 +180,21 @@ public class SimResourceTransformer( } /** - * Counter to track the current submitted work. + * The requested speed. */ - private var _work = 0.0 + private var _limit: Double = 0.0 /** * Update the resource counters for the transformer. */ - private fun updateCounters(ctx: SimResourceContext) { + private fun updateCounters(ctx: SimResourceContext, delta: Long) { val counters = _counters - val remainingWork = ctx.remainingWork - counters.demand += _work - counters.actual += _work - remainingWork - counters.overcommit += remainingWork + val deltaS = delta / 1000.0 + val work = _limit * deltaS + val actualWork = ctx.speed * deltaS + counters.demand += work + counters.actual += actualWork + counters.overcommit += (work - actualWork) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt index 4f4ebb14..1f8434b7 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt @@ -50,8 +50,8 @@ public class SimSpeedConsumerAdapter( callback(0.0) } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return delegate.onNext(ctx) + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + return delegate.onNext(ctx, now, delta) } override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt index 2e94e1c1..e5173e5f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -34,20 +34,11 @@ import org.opendc.simulator.resources.SimResourceEvent public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer { private var iterator: Iterator<Fragment>? = null - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { val iterator = checkNotNull(iterator) return if (iterator.hasNext()) { - val now = ctx.clock.millis() val fragment = iterator.next() - val work = (fragment.duration / 1000) * fragment.usage - val deadline = now + fragment.duration - - assert(deadline >= now) { "Deadline already passed" } - - if (work > 0.0) - SimResourceCommand.Consume(work, fragment.usage, deadline) - else - SimResourceCommand.Idle(deadline) + SimResourceCommand.Consume(fragment.usage, fragment.duration) } else { SimResourceCommand.Exit } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt index faa693c4..ae837043 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -25,6 +25,7 @@ package org.opendc.simulator.resources.consumer import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext +import kotlin.math.roundToLong /** * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization. @@ -39,18 +40,19 @@ public class SimWorkConsumer( require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - private var isFirst = true + private var remainingWork = work - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + val actualWork = ctx.speed * delta / 1000.0 val limit = ctx.capacity * utilization - val work = if (isFirst) { - isFirst = false - work - } else { - ctx.remainingWork - } - return if (work > 0.0) { - SimResourceCommand.Consume(work, limit) + + remainingWork -= actualWork + + val remainingWork = remainingWork + val duration = (remainingWork / limit * 1000).roundToLong() + + return if (duration > 0) { + SimResourceCommand.Consume(limit, duration) } else { SimResourceCommand.Exit } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index b79998a3..a9507e52 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -58,12 +58,6 @@ internal class SimResourceContextImpl( } /** - * The amount of work still remaining at this instant. - */ - override val remainingWork: Double - get() = getRemainingWork(_clock.millis()) - - /** * A flag to indicate the state of the context. */ override val state: SimResourceState @@ -87,8 +81,8 @@ internal class SimResourceContextImpl( * The current state of the resource context. */ private var _timestamp: Long = Long.MIN_VALUE - private var _work: Double = 0.0 private var _limit: Double = 0.0 + private var _duration: Long = Long.MAX_VALUE private var _deadline: Long = Long.MAX_VALUE /** @@ -178,7 +172,6 @@ internal class SimResourceContextImpl( } catch (cause: Throwable) { doFail(cause) } finally { - _remainingWorkFlush = Long.MIN_VALUE _timestamp = timestamp } } @@ -200,29 +193,25 @@ internal class SimResourceContextImpl( SimResourceState.Pending, SimResourceState.Stopped -> state SimResourceState.Active -> { val isInterrupted = _flag and FLAG_INTERRUPT != 0 - val remainingWork = getRemainingWork(timestamp) - val isConsume = _limit > 0.0 val reachedDeadline = _deadline <= timestamp + val delta = max(0, timestamp - _timestamp) // Update the resource counters only if there is some progress if (timestamp > _timestamp) { - logic.onUpdate(this, _work, reachedDeadline) + logic.onUpdate(this, delta, _limit, reachedDeadline) } // We should only continue processing the next command if: // 1. The resource consumption was finished. // 2. The resource capacity cannot satisfy the demand. // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) - if ((isConsume && remainingWork == 0.0) || reachedDeadline || isInterrupted) { - when (val command = consumer.onNext(this)) { - is SimResourceCommand.Idle -> interpretIdle(timestamp, command.deadline) - is SimResourceCommand.Consume -> interpretConsume(timestamp, command.work, command.limit, command.deadline) + if (reachedDeadline || isInterrupted) { + when (val command = consumer.onNext(this, timestamp, delta)) { + is SimResourceCommand.Consume -> interpretConsume(timestamp, command.limit, command.duration) is SimResourceCommand.Exit -> interpretExit() } - } else if (isConsume) { - interpretConsume(timestamp, remainingWork, _limit, _deadline) } else { - interpretIdle(timestamp, _deadline) + interpretConsume(timestamp, _limit, _duration - delta) } } } @@ -257,32 +246,15 @@ internal class SimResourceContextImpl( /** * Interpret the [SimResourceCommand.Consume] command. */ - private fun interpretConsume(now: Long, work: Double, limit: Double, deadline: Long): SimResourceState { - require(deadline >= now) { "Deadline already passed" } - + private fun interpretConsume(now: Long, limit: Double, duration: Long): SimResourceState { _speed = min(capacity, limit) - _work = work _limit = limit - _deadline = deadline + _duration = duration - val timestamp = logic.onConsume(this, work, limit, deadline) - scheduleUpdate(timestamp) + val timestamp = logic.onConsume(this, now, limit, duration) - return SimResourceState.Active - } - - /** - * Interpret the [SimResourceCommand.Idle] command. - */ - private fun interpretIdle(now: Long, deadline: Long): SimResourceState { - require(deadline >= now) { "Deadline already passed" } - - _speed = 0.0 - _work = 0.0 - _limit = 0.0 - _deadline = deadline + _deadline = timestamp - val timestamp = logic.onIdle(this, deadline) scheduleUpdate(timestamp) return SimResourceState.Active @@ -293,37 +265,13 @@ internal class SimResourceContextImpl( */ private fun interpretExit(): SimResourceState { _speed = 0.0 - _work = 0.0 _limit = 0.0 + _duration = Long.MAX_VALUE _deadline = Long.MAX_VALUE return SimResourceState.Stopped } - private var _remainingWork: Double = 0.0 - private var _remainingWorkFlush: Long = Long.MIN_VALUE - - /** - * Obtain the remaining work at the given timestamp. - */ - private fun getRemainingWork(now: Long): Double { - return if (_remainingWorkFlush < now) { - _remainingWorkFlush = now - computeRemainingWork(now).also { _remainingWork = it } - } else { - _remainingWork - } - } - - /** - * Compute the remaining work based on the current state. - */ - private fun computeRemainingWork(now: Long): Double { - return if (_work > 0.0) - max(0.0, _work - logic.getConsumedWork(this, _work, speed, now - _timestamp)) - else 0.0 - } - /** * Indicate that the capacity of the resource has changed. */ @@ -333,16 +281,11 @@ internal class SimResourceContextImpl( return } - val isThrottled = speed > capacity - interpreter.batch { // Inform the consumer of the capacity change. This might already trigger an interrupt. consumer.onEvent(this, SimResourceEvent.Capacity) - // Optimization: only invalidate context if the new capacity cannot satisfy the active resource command. - if (isThrottled) { - invalidate() - } + interrupt() } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index 2f01a8c4..a9390553 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -38,7 +38,6 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * Test suite for the [SimResourceAggregatorMaxMin] class. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceAggregatorMaxMinTest { @Test fun testSingleCapacity() = runBlockingSimulation { @@ -102,15 +101,15 @@ internal class SimResourceAggregatorMaxMinTest { sources.forEach(aggregator::addInput) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(4.0, 1000)) .andThen(SimResourceCommand.Exit) aggregator.consume(consumer) yield() assertEquals(1000, clock.millis()) - verify(exactly = 2) { consumer.onNext(any()) } + verify(exactly = 2) { consumer.onNext(any(), any(), any()) } } @Test @@ -125,8 +124,8 @@ internal class SimResourceAggregatorMaxMinTest { sources.forEach(aggregator::addInput) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1.0, 1.0)) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1.0, duration = 1000)) .andThenThrows(IllegalStateException("Test Exception")) assertThrows<IllegalStateException> { aggregator.consume(consumer) } @@ -152,7 +151,7 @@ internal class SimResourceAggregatorMaxMinTest { sources[0].capacity = 0.5 } yield() - assertEquals(2334, clock.millis()) + assertEquals(2333, clock.millis()) } @Test @@ -173,7 +172,7 @@ internal class SimResourceAggregatorMaxMinTest { sources[0].capacity = 0.5 } yield() - assertEquals(1000, clock.millis()) + assertEquals(1167, clock.millis()) } @Test @@ -188,8 +187,8 @@ internal class SimResourceAggregatorMaxMinTest { sources.forEach(aggregator::addInput) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(4.0, 1000)) .andThen(SimResourceCommand.Exit) aggregator.consume(consumer) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt index 02d456ff..9a52dc63 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt @@ -31,44 +31,23 @@ import org.junit.jupiter.api.assertThrows */ class SimResourceCommandTest { @Test - fun testZeroWork() { - assertThrows<IllegalArgumentException> { - SimResourceCommand.Consume(0.0, 1.0) - } - } - - @Test - fun testNegativeWork() { - assertThrows<IllegalArgumentException> { - SimResourceCommand.Consume(-1.0, 1.0) - } - } - - @Test - fun testZeroLimit() { + fun testNegativeLimit() { assertThrows<IllegalArgumentException> { - SimResourceCommand.Consume(1.0, 0.0) + SimResourceCommand.Consume(-1.0, 1) } } @Test - fun testNegativeLimit() { + fun testNegativeDuration() { assertThrows<IllegalArgumentException> { - SimResourceCommand.Consume(1.0, -1.0, 1) + SimResourceCommand.Consume(1.0, -1) } } @Test fun testConsumeCorrect() { assertDoesNotThrow { - SimResourceCommand.Consume(1.0, 1.0) - } - } - - @Test - fun testIdleCorrect() { - assertDoesNotThrow { - SimResourceCommand.Idle(1) + SimResourceCommand.Consume(1.0) } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index 6cb507ce..0cb95abb 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -32,19 +32,14 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * A test suite for the [SimResourceContextImpl] class. */ -@OptIn(ExperimentalCoroutinesApi::class) class SimResourceContextTest { @Test fun testFlushWithoutCommand() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit - val logic = object : SimResourceProviderLogic { - override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline - override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline - override fun onFinish(ctx: SimResourceControllableContext) {} - } + val logic = object : SimResourceProviderLogic {} val context = SimResourceContextImpl(null, interpreter, consumer, logic) context.doUpdate(interpreter.clock.millis()) @@ -54,12 +49,11 @@ class SimResourceContextTest { fun testIntermediateFlush() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit val logic = spyk(object : SimResourceProviderLogic { - override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline override fun onFinish(ctx: SimResourceControllableContext) {} - override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline + override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long = duration }) val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic)) @@ -74,13 +68,9 @@ class SimResourceContextTest { fun testIntermediateFlushIdle() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) andThen SimResourceCommand.Exit - val logic = spyk(object : SimResourceProviderLogic { - override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline - override fun onFinish(ctx: SimResourceControllableContext) {} - override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline - }) + val logic = spyk(object : SimResourceProviderLogic {}) val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic)) context.start() @@ -90,7 +80,7 @@ class SimResourceContextTest { context.invalidate() assertAll( - { verify(exactly = 2) { logic.onIdle(any(), any()) } }, + { verify(exactly = 2) { logic.onConsume(any(), any(), 0.0, any()) } }, { verify(exactly = 1) { logic.onFinish(any()) } } ) } @@ -99,13 +89,9 @@ class SimResourceContextTest { fun testDoubleStart() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) andThen SimResourceCommand.Exit - val logic = object : SimResourceProviderLogic { - override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline - override fun onFinish(ctx: SimResourceControllableContext) {} - override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline - } + val logic = object : SimResourceProviderLogic {} val context = SimResourceContextImpl(null, interpreter, consumer, logic) context.start() @@ -116,16 +102,12 @@ class SimResourceContextTest { } @Test - fun testIdempodentCapacityChange() = runBlockingSimulation { + fun testIdempotentCapacityChange() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit - val logic = object : SimResourceProviderLogic { - override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline - override fun onFinish(ctx: SimResourceControllableContext) {} - override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline - } + val logic = object : SimResourceProviderLogic {} val context = SimResourceContextImpl(null, interpreter, consumer, logic) context.capacity = 4200.0 @@ -139,15 +121,11 @@ class SimResourceContextTest { fun testFailureNoInfiniteLoop() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Exit + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent") every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure") - val logic = spyk(object : SimResourceProviderLogic { - override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline - override fun onFinish(ctx: SimResourceControllableContext) {} - override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline - }) + val logic = spyk(object : SimResourceProviderLogic {}) val context = SimResourceContextImpl(null, interpreter, consumer, logic) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 4895544d..c310fad6 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -37,8 +37,7 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * A test suite for the [SimResourceSource] class. */ -@OptIn(ExperimentalCoroutinesApi::class) -class SimResourceSourceTest { +internal class SimResourceSourceTest { @Test fun testSpeed() = runBlockingSimulation { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) @@ -46,8 +45,8 @@ class SimResourceSourceTest { val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1000 * capacity, capacity)) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(capacity, duration = 1000)) .andThen(SimResourceCommand.Exit) val res = mutableListOf<Double>() @@ -81,8 +80,8 @@ class SimResourceSourceTest { val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1000 * capacity, 2 * capacity)) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(2 * capacity, duration = 1000)) .andThen(SimResourceCommand.Exit) val res = mutableListOf<Double>() @@ -104,7 +103,7 @@ class SimResourceSourceTest { val provider = SimResourceSource(capacity, scheduler) val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { return SimResourceCommand.Exit } @@ -133,11 +132,10 @@ class SimResourceSourceTest { } } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - assertEquals(0.0, ctx.remainingWork) + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { return if (isFirst) { isFirst = false - SimResourceCommand.Consume(4.0, 1.0) + SimResourceCommand.Consume(1.0, duration = 4000) } else { SimResourceCommand.Exit } @@ -175,8 +173,8 @@ class SimResourceSourceTest { val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1.0, 1.0)) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1.0, duration = 1000)) .andThenThrows(IllegalStateException()) assertThrows<IllegalStateException> { @@ -191,8 +189,8 @@ class SimResourceSourceTest { val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1.0, 1.0)) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1.0)) .andThenThrows(IllegalStateException()) assertThrows<IllegalStateException> { @@ -210,8 +208,8 @@ class SimResourceSourceTest { val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1.0, 1.0)) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1.0)) .andThenThrows(IllegalStateException()) launch { provider.consume(consumer) } @@ -228,8 +226,8 @@ class SimResourceSourceTest { val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Idle(clock.millis() + 500)) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(0.0, 500)) .andThen(SimResourceCommand.Exit) provider.consume(consumer) @@ -246,28 +244,12 @@ class SimResourceSourceTest { val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Idle()) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(0.0)) .andThenThrows(IllegalStateException()) provider.consume(consumer) } } } - - @Test - fun testIncorrectDeadline() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Idle(2)) - .andThen(SimResourceCommand.Exit) - - delay(10) - - assertThrows<IllegalArgumentException> { provider.consume(consumer) } - } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index ad8d82e3..ad3b0f9f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -38,7 +37,6 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * Test suite for the [SimResourceSwitchExclusive] class. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceSwitchExclusiveTest { /** * Test a trace workload. @@ -91,7 +89,7 @@ internal class SimResourceSwitchExclusiveTest { val duration = 5 * 60L * 1000 val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit + every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, duration = duration) andThen SimResourceCommand.Exit val switch = SimResourceSwitchExclusive() val source = SimResourceSource(3200.0, scheduler) @@ -127,10 +125,10 @@ internal class SimResourceSwitchExclusiveTest { } } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { return if (isFirst) { isFirst = false - SimResourceCommand.Consume(duration / 1000.0, 1.0) + SimResourceCommand.Consume(1.0, duration = duration) } else { SimResourceCommand.Exit } @@ -161,9 +159,8 @@ internal class SimResourceSwitchExclusiveTest { fun testConcurrentWorkloadFails() = runBlockingSimulation { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val duration = 5 * 60L * 1000 val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit + every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit val switch = SimResourceSwitchExclusive() val source = SimResourceSource(3200.0, scheduler) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index e4292ec0..d8f18e65 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlinx.coroutines.yield @@ -37,7 +36,6 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * Test suite for the [SimResourceSwitch] implementations */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceSwitchMaxMinTest { @Test fun testSmoke() = runBlockingSimulation { @@ -50,7 +48,7 @@ internal class SimResourceSwitchMaxMinTest { val provider = switch.newOutput() val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, duration = 1000) andThen SimResourceCommand.Exit try { provider.consume(consumer) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index cf69b7b5..3780fd60 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -37,7 +37,6 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * A test suite for the [SimResourceTransformer] class. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceTransformerTest { @Test fun testCancelImmediately() = runBlockingSimulation { @@ -48,7 +47,7 @@ internal class SimResourceTransformerTest { launch { source.consume(forwarder) } forwarder.consume(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { return SimResourceCommand.Exit } }) @@ -68,10 +67,10 @@ internal class SimResourceTransformerTest { forwarder.consume(object : SimResourceConsumer { var isFirst = true - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { return if (isFirst) { isFirst = false - SimResourceCommand.Consume(10.0, 1.0) + SimResourceCommand.Consume(1.0, duration = 10 * 1000L) } else { SimResourceCommand.Exit } @@ -86,7 +85,7 @@ internal class SimResourceTransformerTest { fun testState() = runBlockingSimulation { val forwarder = SimResourceForwarder() val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand = SimResourceCommand.Exit } assertFalse(forwarder.isActive) @@ -108,7 +107,7 @@ internal class SimResourceTransformerTest { val forwarder = SimResourceForwarder() val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Exit + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit forwarder.startConsumer(consumer) forwarder.cancel() @@ -123,7 +122,7 @@ internal class SimResourceTransformerTest { val source = SimResourceSource(2000.0, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) source.startConsumer(forwarder) yield() @@ -142,7 +141,7 @@ internal class SimResourceTransformerTest { val source = SimResourceSource(2000.0, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) source.startConsumer(forwarder) yield() @@ -161,7 +160,7 @@ internal class SimResourceTransformerTest { val source = SimResourceSource(2000.0, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Exit + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit source.startConsumer(forwarder) forwarder.consume(consumer) @@ -200,7 +199,7 @@ internal class SimResourceTransformerTest { forwarder.consume(consumer) assertEquals(0, clock.millis()) - verify(exactly = 1) { consumer.onNext(any()) } + verify(exactly = 1) { consumer.onNext(any(), any(), any()) } } @Test diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt index 42648cf1..830f16d3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.ExperimentalCoroutinesApi import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opendc.simulator.core.runBlockingSimulation @@ -32,7 +31,6 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * A test suite for the [SimWorkConsumer] class. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimWorkConsumerTest { @Test fun testSmoke() = runBlockingSimulation { diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 728dfd99..992b4991 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -126,7 +126,7 @@ internal class WorkflowServiceTest { { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, - { assertEquals(33213237L, clock.millis()) } + { assertEquals(33213236L, clock.millis()) } ) } |
