diff options
Diffstat (limited to 'opendc-simulator')
24 files changed, 385 insertions, 425 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt index 34ac4418..6e6e590f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.compute.device import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.power.SimPowerInlet -import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext import org.opendc.simulator.resources.SimResourceEvent @@ -83,13 +82,10 @@ public class SimPsu( } override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0) - - return if (powerDraw > 0.0) - SimResourceCommand.Consume(powerDraw, Long.MAX_VALUE) - else - SimResourceCommand.Consume(0.0) + ctx.push(powerDraw) + return Long.MAX_VALUE } override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 527619bd..dd582bb2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext import kotlin.math.min @@ -80,13 +79,20 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val } private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { - val fragment = pullFragment(now) ?: return SimResourceCommand.Exit + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + val fragment = pullFragment(now) + + if (fragment == null) { + ctx.close() + return Long.MAX_VALUE + } + val timestamp = fragment.timestamp + offset // Fragment is in the future if (timestamp > now) { - return SimResourceCommand.Consume(0.0, timestamp - now) + ctx.push(0.0) + return timestamp - now } val cores = min(cpu.node.coreCount, fragment.cores) @@ -97,10 +103,9 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val val deadline = timestamp + fragment.duration val duration = deadline - now - return if (cpu.id < cores && usage > 0.0) - SimResourceCommand.Consume(usage, duration) - else - SimResourceCommand.Consume(0.0, duration) + ctx.push(if (cpu.id < cores && usage > 0.0) usage else 0.0) + + return duration } } diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt index f2dd8455..7db0f176 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt @@ -32,7 +32,7 @@ public class SimNetworkSink( public val capacity: Double ) : SimNetworkPort() { override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand = SimResourceCommand.Consume(0.0) + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long = Long.MAX_VALUE override fun toString(): String = "SimNetworkSink.Consumer" } diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt index e8839496..b0ea7f0a 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt @@ -47,16 +47,13 @@ public class SimPdu( public fun newOutlet(): Outlet = Outlet(distributor.newOutput()) override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer by distributor { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { - return when (val cmd = distributor.onNext(ctx, now, delta)) { - is SimResourceCommand.Consume -> { - val loss = computePowerLoss(cmd.limit) - val newLimit = cmd.limit + loss + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + val duration = distributor.onNext(ctx, now, delta) + val loss = computePowerLoss(ctx.demand) + val newLimit = ctx.demand + loss - SimResourceCommand.Consume(newLimit, cmd.duration) - } - else -> cmd - } + ctx.push(newLimit) + return duration } override fun toString(): String = "SimPdu.Consumer" diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt index 4c2beb68..59006dfc 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt @@ -55,16 +55,13 @@ public class SimUps( override fun onConnect(inlet: SimPowerInlet) { val consumer = inlet.createConsumer() aggregator.startConsumer(object : SimResourceConsumer by consumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { - return when (val cmd = consumer.onNext(ctx, now, delta)) { - is SimResourceCommand.Consume -> { - val loss = computePowerLoss(cmd.limit) - val newLimit = cmd.limit + loss + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + val duration = consumer.onNext(ctx, now, delta) + val loss = computePowerLoss(ctx.demand) + val newLimit = ctx.demand + loss - SimResourceCommand.Consume(newLimit, cmd.duration) - } - else -> cmd - } + ctx.push(newLimit) + return duration } }) } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index da5c3257..8e0eb5f8 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -125,16 +125,21 @@ public abstract class SimAbstractResourceAggregator( /** * An input for the resource aggregator. */ - public interface Input { + public interface Input : AutoCloseable { /** * The [SimResourceContext] associated with the input. */ public val ctx: SimResourceContext /** - * Push the specified [SimResourceCommand] to the input. + * Push to this input with the specified [limit] and [duration]. */ - public fun push(command: SimResourceCommand) + public fun push(limit: Double, duration: Long) + + /** + * Close the input for further input. + */ + public override fun close() } /** @@ -151,7 +156,12 @@ public abstract class SimAbstractResourceAggregator( /** * The resource command to run next. */ - private var command: SimResourceCommand? = null + private var _duration: Long = Long.MAX_VALUE + + /** + * A flag to indicate that the consumer should flush. + */ + private var _isPushed = false private fun updateCapacity() { // Adjust capacity of output resource @@ -159,25 +169,34 @@ public abstract class SimAbstractResourceAggregator( } /* Input */ - override fun push(command: SimResourceCommand) { - this.command = command - _ctx?.interrupt() + override fun push(limit: Double, duration: Long) { + _duration = duration + val ctx = _ctx + if (ctx != null) { + ctx.push(limit) + ctx.interrupt() + } + _isPushed = true + } + + override fun close() { + _duration = Long.MAX_VALUE + _isPushed = true + _ctx?.close() } /* SimResourceConsumer */ - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { - var next = command + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + var next = _duration - return if (next != null) { - this.command = null - next - } else { + if (!_isPushed) { _output.flush() - - next = command - this.command = null - next ?: SimResourceCommand.Consume(0.0) + next = _duration } + + _isPushed = false + _duration = Long.MAX_VALUE + return next } override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index 537be1b5..b258a368 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -41,11 +41,7 @@ public class SimResourceAggregatorMaxMin( val fraction = inputCapacity / capacity val grantedSpeed = limit * fraction - val command = if (grantedSpeed > 0.0) - SimResourceCommand.Consume(grantedSpeed, duration) - else - SimResourceCommand.Consume(0.0, duration) - input.push(command) + input.push(grantedSpeed, duration) } } @@ -53,7 +49,7 @@ public class SimResourceAggregatorMaxMin( val iterator = consumers.iterator() for (input in iterator) { iterator.remove() - input.push(SimResourceCommand.Exit) + input.close() } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt deleted file mode 100644 index 4a980071..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer]. - */ -public sealed class SimResourceCommand { - /** - * A request to the resource to perform work for the specified [duration]. - * - * @param limit The maximum amount of work to be processed per second. - * @param duration The duration of the resource consumption in milliseconds. - */ - public data class Consume(val limit: Double, val duration: Long = Long.MAX_VALUE) : SimResourceCommand() { - init { - require(limit >= 0.0) { "Negative limit is not allowed" } - require(duration >= 0) { "Duration must be positive" } - } - } - - /** - * An indication to the resource that the consumer has finished. - */ - public object Exit : SimResourceCommand() -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 4d1d2c32..0b25358a 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -30,15 +30,14 @@ package org.opendc.simulator.resources */ public interface SimResourceConsumer { /** - * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because - * the resource finished processing, reached its deadline or was interrupted. + * This method is invoked when the resource provider is pulling this resource consumer. * * @param ctx The execution context in which the consumer runs. * @param now The virtual timestamp in milliseconds at which the update is occurring. * @param delta The virtual duration between this call and the last call in milliseconds. - * @return The next command that the resource should execute. + * @return The duration after which the resource consumer should be pulled again. */ - public fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand + public fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long /** * This method is invoked when an event has occurred. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index f28b43d0..225cae0b 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -28,7 +28,7 @@ import java.time.Clock * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a * resource and a resource consumer. */ -public interface SimResourceContext { +public interface SimResourceContext : AutoCloseable { /** * The virtual clock tracking simulation time. */ @@ -53,4 +53,16 @@ public interface SimResourceContext { * Ask the resource provider to interrupt its resource. */ public fun interrupt() + + /** + * Push the given flow to this context. + * + * @param rate The rate of the flow to push. + */ + public fun push(rate: Double) + + /** + * Stop the resource context. + */ + public override fun close() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt index ceaca39a..ba52b597 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt @@ -27,7 +27,7 @@ package org.opendc.simulator.resources * * This interface is used by resource providers to control the resource context. */ -public interface SimResourceControllableContext : SimResourceContext, AutoCloseable { +public interface SimResourceControllableContext : SimResourceContext { /** * The state of the resource context. */ @@ -44,11 +44,6 @@ public interface SimResourceControllableContext : SimResourceContext, AutoClosea public fun start() /** - * Stop the resource context. - */ - public override fun close() - - /** * Invalidate the resource context's state. * * By invalidating the resource context's current state, the state is re-computed and the current progress is diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index d23c7dbb..eac58410 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -54,11 +54,6 @@ public class SimResourceDistributorMaxMin( private val activeOutputs: MutableList<Output> = mutableListOf() /** - * The total amount of work allocated to be executed. - */ - private var totalAllocatedWork = 0.0 - - /** * The total allocated speed for the output resources. */ private var totalAllocatedSpeed = 0.0 @@ -97,7 +92,7 @@ public class SimResourceDistributorMaxMin( } /* SimResourceConsumer */ - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { return doNext(ctx, now) } @@ -137,10 +132,10 @@ public class SimResourceDistributorMaxMin( /** * Schedule the work of the outputs. */ - private fun doNext(ctx: SimResourceContext, now: Long): SimResourceCommand { + private fun doNext(ctx: SimResourceContext, now: Long): Long { // If there is no work yet, mark the input as idle. if (activeOutputs.isEmpty()) { - return SimResourceCommand.Consume(0.0) + return Long.MAX_VALUE } val capacity = ctx.capacity @@ -196,14 +191,11 @@ public class SimResourceDistributorMaxMin( } this.totalRequestedSpeed = totalRequestedSpeed - this.totalAllocatedWork = totalAllocatedWork val totalAllocatedSpeed = capacity - availableSpeed this.totalAllocatedSpeed = totalAllocatedSpeed - return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) - SimResourceCommand.Consume(totalAllocatedSpeed, duration) - else - SimResourceCommand.Consume(0.0, duration) + ctx.push(totalAllocatedSpeed) + return duration } private fun updateCapacity(ctx: SimResourceContext) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index 68bedbd9..f12ef9f1 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.resources import org.opendc.simulator.resources.impl.SimResourceCountersImpl +import java.time.Clock /** * A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider. @@ -32,14 +33,9 @@ import org.opendc.simulator.resources.impl.SimResourceCountersImpl */ public class SimResourceTransformer( private val isCoupled: Boolean = false, - private val transform: (SimResourceContext, SimResourceCommand) -> SimResourceCommand + private val transform: (SimResourceContext, Long) -> Long ) : SimResourceFlow, AutoCloseable { /** - * The [SimResourceContext] in which the forwarder runs. - */ - private var ctx: SimResourceContext? = null - - /** * The delegate [SimResourceConsumer]. */ private var delegate: SimResourceConsumer? = null @@ -49,17 +45,63 @@ public class SimResourceTransformer( */ private var hasDelegateStarted: Boolean = false + /** + * The exposed [SimResourceContext]. + */ + private val ctx = object : SimResourceContext { + override val clock: Clock + get() = _ctx!!.clock + + override val capacity: Double + get() = _ctx?.capacity ?: 0.0 + + override val demand: Double + get() = _ctx?.demand ?: 0.0 + + override val speed: Double + get() = _ctx?.speed ?: 0.0 + + override fun interrupt() { + _ctx?.interrupt() + } + + override fun push(rate: Double) { + _ctx?.push(rate) + _limit = rate + } + + override fun close() { + val delegate = checkNotNull(delegate) { "Delegate not active" } + + // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we + // reset beforehand the existing state and check whether it has been updated afterwards + reset() + + delegate.onEvent(this, SimResourceEvent.Exit) + + if (isCoupled) + _ctx?.close() + else + _ctx?.push(0.0) + } + } + + /** + * The [SimResourceContext] in which the forwarder runs. + */ + private var _ctx: SimResourceContext? = null + override val isActive: Boolean get() = delegate != null override val capacity: Double - get() = ctx?.capacity ?: 0.0 + get() = ctx.capacity override val speed: Double - get() = ctx?.speed ?: 0.0 + get() = ctx.speed override val demand: Double - get() = ctx?.demand ?: 0.0 + get() = ctx.demand override val counters: SimResourceCounters get() = _counters @@ -75,32 +117,32 @@ public class SimResourceTransformer( } override fun interrupt() { - ctx?.interrupt() + ctx.interrupt() } override fun cancel() { val delegate = delegate - val ctx = ctx + val ctx = _ctx if (delegate != null) { this.delegate = null if (ctx != null) { - delegate.onEvent(ctx, SimResourceEvent.Exit) + delegate.onEvent(this.ctx, SimResourceEvent.Exit) } } } override fun close() { - val ctx = ctx + val ctx = _ctx if (ctx != null) { - this.ctx = null + this._ctx = null ctx.interrupt() } } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { val delegate = delegate if (!hasDelegateStarted) { @@ -110,54 +152,39 @@ public class SimResourceTransformer( updateCounters(ctx, delta) return if (delegate != null) { - val command = transform(ctx, delegate.onNext(ctx, now, delta)) - - _limit = if (command is SimResourceCommand.Consume) command.limit else 0.0 - - if (command == SimResourceCommand.Exit) { - // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we - // reset beforehand the existing state and check whether it has been updated afterwards - reset() - - delegate.onEvent(ctx, SimResourceEvent.Exit) - - if (isCoupled) - SimResourceCommand.Exit - else - onNext(ctx, now, delta) - } else { - command - } + val duration = transform(ctx, delegate.onNext(this.ctx, now, delta)) + _limit = ctx.demand + duration } else { - SimResourceCommand.Consume(0.0) + Long.MAX_VALUE } } override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { when (event) { SimResourceEvent.Start -> { - this.ctx = ctx + _ctx = ctx } SimResourceEvent.Exit -> { - this.ctx = null + _ctx = null val delegate = delegate if (delegate != null) { reset() - delegate.onEvent(ctx, SimResourceEvent.Exit) + delegate.onEvent(this.ctx, SimResourceEvent.Exit) } } - else -> delegate?.onEvent(ctx, event) + else -> delegate?.onEvent(this.ctx, event) } } override fun onFailure(ctx: SimResourceContext, cause: Throwable) { - this.ctx = null + _ctx = null val delegate = delegate if (delegate != null) { reset() - delegate.onFailure(ctx, cause) + delegate.onFailure(this.ctx, cause) } } @@ -166,7 +193,7 @@ public class SimResourceTransformer( */ private fun start() { val delegate = delegate ?: return - delegate.onEvent(checkNotNull(ctx), SimResourceEvent.Start) + delegate.onEvent(checkNotNull(_ctx), SimResourceEvent.Start) hasDelegateStarted = true } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt index 1f8434b7..46885640 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources.consumer -import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext import org.opendc.simulator.resources.SimResourceEvent @@ -50,7 +49,7 @@ public class SimSpeedConsumerAdapter( callback(0.0) } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { return delegate.onNext(ctx, now, delta) } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt index e5173e5f..ad6b0108 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources.consumer -import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext import org.opendc.simulator.resources.SimResourceEvent @@ -34,13 +33,15 @@ import org.opendc.simulator.resources.SimResourceEvent public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer { private var iterator: Iterator<Fragment>? = null - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { val iterator = checkNotNull(iterator) return if (iterator.hasNext()) { val fragment = iterator.next() - SimResourceCommand.Consume(fragment.usage, fragment.duration) + ctx.push(fragment.usage) + fragment.duration } else { - SimResourceCommand.Exit + ctx.close() + Long.MAX_VALUE } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt index ae837043..bf76711f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources.consumer -import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext import kotlin.math.roundToLong @@ -37,12 +36,12 @@ public class SimWorkConsumer( init { require(work >= 0.0) { "Work must be positive" } - require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } + require(utilization > 0.0) { "Utilization must be positive" } } private var remainingWork = work - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { val actualWork = ctx.speed * delta / 1000.0 val limit = ctx.capacity * utilization @@ -52,9 +51,11 @@ public class SimWorkConsumer( val duration = (remainingWork / limit * 1000).roundToLong() return if (duration > 0) { - SimResourceCommand.Consume(limit, duration) + ctx.push(limit) + duration } else { - SimResourceCommand.Exit + ctx.close() + Long.MAX_VALUE } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index a9507e52..d7ea0043 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -86,6 +86,11 @@ internal class SimResourceContextImpl( private var _deadline: Long = Long.MAX_VALUE /** + * A flag to indicate that an update is active. + */ + private var _updateActive = false + + /** * The update flag indicating why the update was triggered. */ private var _flag: Int = 0 @@ -108,7 +113,9 @@ internal class SimResourceContextImpl( if (_state != SimResourceState.Stopped) { interpreter.batch { _state = SimResourceState.Stopped - doStop() + if (!_updateActive) { + doStop() + } } } } @@ -139,6 +146,11 @@ internal class SimResourceContextImpl( interpreter.scheduleSync(this) } + override fun push(rate: Double) { + _speed = min(capacity, rate) + _limit = rate + } + /** * Determine whether the state of the resource context should be updated. */ @@ -151,14 +163,49 @@ internal class SimResourceContextImpl( * Update the state of the resource context. */ fun doUpdate(timestamp: Long) { + val oldState = _state + if (oldState != SimResourceState.Active) { + return + } + + _updateActive = true + + val flag = _flag + val isInterrupted = flag and FLAG_INTERRUPT != 0 + val reachedDeadline = _deadline <= timestamp + val delta = max(0, timestamp - _timestamp) + try { - val oldState = _state - val newState = doUpdate(timestamp, oldState) - _state = newState + // Update the resource counters only if there is some progress + if (timestamp > _timestamp) { + logic.onUpdate(this, delta, _limit, reachedDeadline) + } + + // We should only continue processing the next command if: + // 1. The resource consumption was finished. + // 2. The resource consumer should be interrupted (e.g., someone called .interrupt()) + val duration = if (reachedDeadline || isInterrupted) { + consumer.onNext(this, timestamp, delta) + } else { + _deadline - timestamp + } + + // Reset update flags _flag = 0 - when (newState) { + when (_state) { + SimResourceState.Active -> { + val limit = _limit + push(limit) + _duration = duration + + val target = logic.onConsume(this, timestamp, limit, duration) + + _deadline = target + + scheduleUpdate(target) + } SimResourceState.Pending -> if (oldState != SimResourceState.Pending) { throw IllegalStateException("Illegal transition to pending state") @@ -167,12 +214,12 @@ internal class SimResourceContextImpl( if (oldState != SimResourceState.Stopped) { doStop() } - else -> {} } } catch (cause: Throwable) { doFail(cause) } finally { _timestamp = timestamp + _updateActive = false } } @@ -185,39 +232,6 @@ internal class SimResourceContextImpl( override fun toString(): String = "SimResourceContextImpl[capacity=$capacity]" /** - * Update the state of the resource context. - */ - private fun doUpdate(timestamp: Long, state: SimResourceState): SimResourceState { - return when (state) { - // Resource context is not active, so its state will not update - SimResourceState.Pending, SimResourceState.Stopped -> state - SimResourceState.Active -> { - val isInterrupted = _flag and FLAG_INTERRUPT != 0 - val reachedDeadline = _deadline <= timestamp - val delta = max(0, timestamp - _timestamp) - - // Update the resource counters only if there is some progress - if (timestamp > _timestamp) { - logic.onUpdate(this, delta, _limit, reachedDeadline) - } - - // We should only continue processing the next command if: - // 1. The resource consumption was finished. - // 2. The resource capacity cannot satisfy the demand. - // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) - if (reachedDeadline || isInterrupted) { - when (val command = consumer.onNext(this, timestamp, delta)) { - is SimResourceCommand.Consume -> interpretConsume(timestamp, command.limit, command.duration) - is SimResourceCommand.Exit -> interpretExit() - } - } else { - interpretConsume(timestamp, _limit, _duration - delta) - } - } - } - } - - /** * Stop the resource context. */ private fun doStop() { @@ -226,6 +240,8 @@ internal class SimResourceContextImpl( logic.onFinish(this) } catch (cause: Throwable) { doFail(cause) + } finally { + _deadline = Long.MAX_VALUE } } @@ -244,35 +260,6 @@ internal class SimResourceContextImpl( } /** - * Interpret the [SimResourceCommand.Consume] command. - */ - private fun interpretConsume(now: Long, limit: Double, duration: Long): SimResourceState { - _speed = min(capacity, limit) - _limit = limit - _duration = duration - - val timestamp = logic.onConsume(this, now, limit, duration) - - _deadline = timestamp - - scheduleUpdate(timestamp) - - return SimResourceState.Active - } - - /** - * Interpret the [SimResourceCommand.Exit] command. - */ - private fun interpretExit(): SimResourceState { - _speed = 0.0 - _limit = 0.0 - _duration = Long.MAX_VALUE - _deadline = Long.MAX_VALUE - - return SimResourceState.Stopped - } - - /** * Indicate that the capacity of the resource has changed. */ private fun onCapacityChange() { diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index a9390553..f4ea5fe8 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -22,14 +22,12 @@ package org.opendc.simulator.resources -import io.mockk.every -import io.mockk.mockk +import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer @@ -100,10 +98,17 @@ internal class SimResourceAggregatorMaxMinTest { ) sources.forEach(aggregator::addInput) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(4.0, 1000)) - .andThen(SimResourceCommand.Exit) + val consumer = spyk(object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (now == 0L) { + ctx.push(4.0) + 1000 + } else { + ctx.close() + Long.MAX_VALUE + } + } + }) aggregator.consume(consumer) yield() @@ -113,27 +118,6 @@ internal class SimResourceAggregatorMaxMinTest { } @Test - fun testException() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - - val aggregator = SimResourceAggregatorMaxMin(scheduler) - val sources = listOf( - SimResourceSource(1.0, scheduler), - SimResourceSource(1.0, scheduler) - ) - sources.forEach(aggregator::addInput) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(1.0, duration = 1000)) - .andThenThrows(IllegalStateException("Test Exception")) - - assertThrows<IllegalStateException> { aggregator.consume(consumer) } - yield() - assertFalse(sources[0].isActive) - } - - @Test fun testAdjustCapacity() = runBlockingSimulation { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) @@ -186,10 +170,17 @@ internal class SimResourceAggregatorMaxMinTest { ) sources.forEach(aggregator::addInput) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(4.0, 1000)) - .andThen(SimResourceCommand.Exit) + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (now == 0L) { + ctx.push(4.0) + 1000 + } else { + ctx.close() + Long.MAX_VALUE + } + } + } aggregator.consume(consumer) yield() diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt deleted file mode 100644 index 9a52dc63..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows - -/** - * Test suite for [SimResourceCommand]. - */ -class SimResourceCommandTest { - @Test - fun testNegativeLimit() { - assertThrows<IllegalArgumentException> { - SimResourceCommand.Consume(-1.0, 1) - } - } - - @Test - fun testNegativeDuration() { - assertThrows<IllegalArgumentException> { - SimResourceCommand.Consume(1.0, -1) - } - } - - @Test - fun testConsumeCorrect() { - assertDoesNotThrow { - SimResourceCommand.Consume(1.0) - } - } -} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index 0cb95abb..4e57f598 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -36,8 +36,17 @@ class SimResourceContextTest { @Test fun testFlushWithoutCommand() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (now == 0L) { + ctx.push(1.0) + 1000 + } else { + ctx.close() + Long.MAX_VALUE + } + } + } val logic = object : SimResourceProviderLogic {} val context = SimResourceContextImpl(null, interpreter, consumer, logic) @@ -48,14 +57,23 @@ class SimResourceContextTest { @Test fun testIntermediateFlush() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (now == 0L) { + ctx.push(4.0) + 1000 + } else { + ctx.close() + Long.MAX_VALUE + } + } + } val logic = spyk(object : SimResourceProviderLogic { override fun onFinish(ctx: SimResourceControllableContext) {} override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long = duration }) - val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic)) + val context = SimResourceContextImpl(null, interpreter, consumer, logic) context.start() delay(1) // Delay 1 ms to prevent hitting the fast path @@ -67,11 +85,20 @@ class SimResourceContextTest { @Test fun testIntermediateFlushIdle() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) andThen SimResourceCommand.Exit + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (now == 0L) { + ctx.push(0.0) + 10 + } else { + ctx.close() + Long.MAX_VALUE + } + } + } val logic = spyk(object : SimResourceProviderLogic {}) - val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic)) + val context = SimResourceContextImpl(null, interpreter, consumer, logic) context.start() delay(5) @@ -88,8 +115,17 @@ class SimResourceContextTest { @Test fun testDoubleStart() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) andThen SimResourceCommand.Exit + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (now == 0L) { + ctx.push(0.0) + 1000 + } else { + ctx.close() + Long.MAX_VALUE + } + } + } val logic = object : SimResourceProviderLogic {} val context = SimResourceContextImpl(null, interpreter, consumer, logic) @@ -104,8 +140,17 @@ class SimResourceContextTest { @Test fun testIdempotentCapacityChange() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit + val consumer = spyk(object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (now == 0L) { + ctx.push(1.0) + 1000 + } else { + ctx.close() + Long.MAX_VALUE + } + } + }) val logic = object : SimResourceProviderLogic {} @@ -120,12 +165,23 @@ class SimResourceContextTest { @Test fun testFailureNoInfiniteLoop() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit - every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent") - every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure") - val logic = spyk(object : SimResourceProviderLogic {}) + val consumer = spyk(object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE + } + + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + if (event == SimResourceEvent.Exit) throw IllegalStateException("onEvent") + } + + override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + throw IllegalStateException("onFailure") + } + }) + + val logic = object : SimResourceProviderLogic {} val context = SimResourceContextImpl(null, interpreter, consumer, logic) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index c310fad6..e055daf7 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -44,10 +44,7 @@ internal class SimResourceSourceTest { val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(capacity, duration = 1000)) - .andThen(SimResourceCommand.Exit) + val consumer = SimWorkConsumer(4200.0, 1.0) val res = mutableListOf<Double>() val adapter = SimSpeedConsumerAdapter(consumer, res::add) @@ -79,10 +76,7 @@ internal class SimResourceSourceTest { val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(2 * capacity, duration = 1000)) - .andThen(SimResourceCommand.Exit) + val consumer = SimWorkConsumer(capacity, 2.0) val res = mutableListOf<Double>() val adapter = SimSpeedConsumerAdapter(consumer, res::add) @@ -103,8 +97,9 @@ internal class SimResourceSourceTest { val provider = SimResourceSource(capacity, scheduler) val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { - return SimResourceCommand.Exit + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE } override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { @@ -132,12 +127,14 @@ internal class SimResourceSourceTest { } } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { return if (isFirst) { isFirst = false - SimResourceCommand.Consume(1.0, duration = 4000) + ctx.push(1.0) + 4000 } else { - SimResourceCommand.Exit + ctx.close() + Long.MAX_VALUE } } } @@ -172,10 +169,19 @@ internal class SimResourceSourceTest { val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(1.0, duration = 1000)) - .andThenThrows(IllegalStateException()) + val consumer = object : SimResourceConsumer { + var isFirst = true + + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + ctx.push(1.0) + 1000 + } else { + throw IllegalStateException() + } + } + } assertThrows<IllegalStateException> { provider.consume(consumer) @@ -188,10 +194,7 @@ internal class SimResourceSourceTest { val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(1.0)) - .andThenThrows(IllegalStateException()) + val consumer = SimWorkConsumer(capacity, 1.0) assertThrows<IllegalStateException> { coroutineScope { @@ -207,30 +210,13 @@ internal class SimResourceSourceTest { val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(1.0)) - .andThenThrows(IllegalStateException()) + val consumer = SimWorkConsumer(capacity, 1.0) launch { provider.consume(consumer) } delay(500) provider.cancel() - assertEquals(500, clock.millis()) - } - - @Test - fun testIdle() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(0.0, 500)) - .andThen(SimResourceCommand.Exit) - - provider.consume(consumer) + yield() assertEquals(500, clock.millis()) } @@ -243,10 +229,9 @@ internal class SimResourceSourceTest { val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(0.0)) - .andThenThrows(IllegalStateException()) + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long = Long.MAX_VALUE + } provider.consume(consumer) } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index ad3b0f9f..9f86dc0d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources -import io.mockk.every -import io.mockk.mockk import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -32,6 +30,7 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** @@ -88,8 +87,7 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, duration = duration) andThen SimResourceCommand.Exit + val workload = SimWorkConsumer(duration * 3.2, 1.0) val switch = SimResourceSwitchExclusive() val source = SimResourceSource(3200.0, scheduler) @@ -125,12 +123,14 @@ internal class SimResourceSwitchExclusiveTest { } } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { return if (isFirst) { isFirst = false - SimResourceCommand.Consume(1.0, duration = duration) + ctx.push(1.0) + duration } else { - SimResourceCommand.Exit + ctx.close() + Long.MAX_VALUE } } } @@ -159,9 +159,6 @@ internal class SimResourceSwitchExclusiveTest { fun testConcurrentWorkloadFails() = runBlockingSimulation { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit - val switch = SimResourceSwitchExclusive() val source = SimResourceSource(3200.0, scheduler) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index d8f18e65..ba0d66ff 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources -import io.mockk.every -import io.mockk.mockk import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlinx.coroutines.yield @@ -31,6 +29,7 @@ import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** @@ -46,9 +45,7 @@ internal class SimResourceSwitchMaxMinTest { sources.forEach { switch.addInput(it) } val provider = switch.newOutput() - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, duration = 1000) andThen SimResourceCommand.Exit + val consumer = SimWorkConsumer(2000.0, 1.0) try { provider.consume(consumer) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index 3780fd60..fc43c3da 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources -import io.mockk.every -import io.mockk.mockk import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.* @@ -47,8 +45,9 @@ internal class SimResourceTransformerTest { launch { source.consume(forwarder) } forwarder.consume(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { - return SimResourceCommand.Exit + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE } }) @@ -67,12 +66,14 @@ internal class SimResourceTransformerTest { forwarder.consume(object : SimResourceConsumer { var isFirst = true - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { return if (isFirst) { isFirst = false - SimResourceCommand.Consume(1.0, duration = 10 * 1000L) + ctx.push(1.0) + 10 * 1000 } else { - SimResourceCommand.Exit + ctx.close() + Long.MAX_VALUE } } }) @@ -85,7 +86,10 @@ internal class SimResourceTransformerTest { fun testState() = runBlockingSimulation { val forwarder = SimResourceForwarder() val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand = SimResourceCommand.Exit + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE + } } assertFalse(forwarder.isActive) @@ -106,8 +110,12 @@ internal class SimResourceTransformerTest { fun testCancelPendingDelegate() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit + val consumer = spyk(object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE + } + }) forwarder.startConsumer(consumer) forwarder.cancel() @@ -121,8 +129,7 @@ internal class SimResourceTransformerTest { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) + val consumer = spyk(SimWorkConsumer(2000.0, 1.0)) source.startConsumer(forwarder) yield() @@ -140,8 +147,7 @@ internal class SimResourceTransformerTest { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) + val consumer = spyk(SimWorkConsumer(2000.0, 1.0)) source.startConsumer(forwarder) yield() @@ -159,8 +165,12 @@ internal class SimResourceTransformerTest { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE + } + } source.startConsumer(forwarder) forwarder.consume(consumer) @@ -190,7 +200,7 @@ internal class SimResourceTransformerTest { @Test fun testTransformExit() = runBlockingSimulation { - val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit } + val forwarder = SimResourceTransformer { ctx, _ -> ctx.close(); Long.MAX_VALUE } val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(1.0, scheduler) |
