diff options
Diffstat (limited to 'simulator')
27 files changed, 733 insertions, 525 deletions
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 6929b06c..c4bd0cb4 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -136,8 +136,8 @@ internal class SimHostTest { assertAll( { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, - { assertEquals(4273200, requestedWork, "Requested work does not match") }, - { assertEquals(3133200, grantedWork, "Granted work does not match") }, + { assertEquals(4281600, requestedWork, "Requested work does not match") }, + { assertEquals(3141600, grantedWork, "Granted work does not match") }, { assertEquals(1140000, overcommittedWork, "Overcommitted work does not match") }, { assertEquals(1200006, scope.currentTime) } ) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 59ce895f..7dae53be 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -143,8 +143,8 @@ class CapelinIntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1707132711051, monitor.totalRequestedBurst) }, - { assertEquals(457881474296, monitor.totalGrantedBurst) }, + { assertEquals(1707144601723, monitor.totalRequestedBurst) }, + { assertEquals(457893364971, monitor.totalGrantedBurst) }, { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) @@ -189,8 +189,8 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(711464322955, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(175226276978, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(711464339925, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(175226293948, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt index a99b082a..281d43ae 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -32,7 +32,6 @@ import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* import java.time.Clock -import kotlin.coroutines.CoroutineContext /** * Abstract implementation of the [SimHypervisor] interface. @@ -121,7 +120,7 @@ public abstract class SimAbstractHypervisor : SimHypervisor { override val clock: Clock get() = this@SimAbstractHypervisor.context.clock - override val meta: Map<String, Any> = meta + mapOf("coroutine-context" to context.meta["coroutine-context"] as CoroutineContext) + override val meta: Map<String, Any> = meta override fun interrupt(resource: SimResource) { requireNotNull(this@VirtualMachine.cpus[resource]).interrupt() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 39ae34fe..44906c2b 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -33,6 +33,7 @@ import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.SimResource import org.opendc.simulator.resources.SimResourceProvider import org.opendc.simulator.resources.SimResourceSource +import org.opendc.simulator.resources.consume import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -91,7 +92,7 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = withContext(context) { val resources = resources require(!isTerminated) { "Machine is terminated" } - val ctx = Context(resources, meta + mapOf("coroutine-context" to context)) + val ctx = Context(resources, meta) val totalCapacity = model.cpus.sumByDouble { it.frequency } _speed = MutableList(model.cpus.size) { 0.0 } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index bb97192d..c629fbd9 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.compute import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* -import kotlin.coroutines.CoroutineContext /** * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single @@ -40,7 +39,6 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> { return SimResourceSwitchMaxMin( ctx.clock, - ctx.meta["coroutine-context"] as CoroutineContext, object : SimResourceSwitchMaxMin.Listener<SimProcessingUnit> { override fun onSliceFinish( switch: SimResourceSwitchMaxMin<SimProcessingUnit>, diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index 2001a230..5de69884 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.compute import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.resources.* -import kotlin.coroutines.CoroutineContext /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. @@ -35,6 +34,6 @@ public class SimSpaceSharedHypervisor : SimAbstractHypervisor() { } override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> { - return SimResourceSwitchExclusive(ctx.meta["coroutine-context"] as CoroutineContext) + return SimResourceSwitchExclusive() } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index 9b47821e..f1079ee6 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -24,9 +24,8 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.model.SimProcessingUnit -import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.resources.consumer.SimWorkConsumer /** * A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on @@ -47,29 +46,7 @@ public class SimFlopsWorkload( override fun onStart(ctx: SimMachineContext) {} override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { - return CpuConsumer(ctx) - } - - private inner class CpuConsumer(private val machine: SimMachineContext) : SimResourceConsumer<SimProcessingUnit> { - override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { - val limit = ctx.resource.frequency * utilization - val work = flops.toDouble() / machine.cpus.size - - return if (work > 0.0) { - SimResourceCommand.Consume(work, limit) - } else { - SimResourceCommand.Exit - } - } - - override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { - return if (remainingWork > 0.0) { - val limit = ctx.resource.frequency * utilization - return SimResourceCommand.Consume(remainingWork, limit) - } else { - SimResourceCommand.Exit - } - } + return SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization) } override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)" diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index 313b6ed5..d7aa8f80 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -24,9 +24,8 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.model.SimProcessingUnit -import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.resources.consumer.SimWorkConsumer /** * A [SimWorkload] that models application execution as a single duration. @@ -46,24 +45,8 @@ public class SimRuntimeWorkload( override fun onStart(ctx: SimMachineContext) {} override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { - return CpuConsumer() - } - - private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> { - override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { - val limit = ctx.resource.frequency * utilization - val work = (limit / 1000) * duration - return SimResourceCommand.Consume(work, limit) - } - - override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { - return if (remainingWork > 0.0) { - val limit = ctx.resource.frequency * utilization - SimResourceCommand.Consume(remainingWork, limit) - } else { - SimResourceCommand.Exit - } - } + val limit = cpu.frequency * utilization + return SimWorkConsumer((limit / 1000) * duration, utilization) } override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)" diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 31f58a0f..cc4f3136 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -50,11 +50,7 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa } private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> { - override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { - return onNext(ctx, 0.0) - } - - override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, capacity: Double, remainingWork: Double): SimResourceCommand { val now = ctx.clock.millis() val fragment = fragment ?: return SimResourceCommand.Exit val work = (fragment.duration / 1000) * fragment.usage diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index 52251bff..431ca625 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -49,12 +49,9 @@ public abstract class SimAbstractResourceContext<R : SimResource>( /** * This method is invoked when the resource consumer has finished. */ - public abstract fun onFinish() - - /** - * This method is invoked when the resource consumer throws an exception. - */ - public abstract fun onFailure(cause: Throwable) + public open fun onFinish(cause: Throwable?) { + consumer.onFinish(this, cause) + } /** * Compute the duration that a resource consumption will take with the specified [speed]. @@ -67,7 +64,14 @@ public abstract class SimAbstractResourceContext<R : SimResource>( * Compute the speed at which the resource may be consumed. */ protected open fun getSpeed(limit: Double): Double { - return min(limit, resource.capacity) + return min(limit, getCapacity()) + } + + /** + * Return the capacity available for the resource consumer. + */ + protected open fun getCapacity(): Double { + return resource.capacity } /** @@ -93,13 +97,17 @@ public abstract class SimAbstractResourceContext<R : SimResource>( * Start the consumer. */ public fun start() { - try { - isProcessing = true - latestFlush = clock.millis() + check(state == SimResourceState.Pending) { "Consumer is already started" } + + state = SimResourceState.Active + isProcessing = true + latestFlush = clock.millis() - interpret(consumer.onStart(this)) - } catch (e: Throwable) { - onFailure(e) + try { + consumer.onStart(this) + interpret(consumer.onNext(this, getCapacity(), 0.0)) + } catch (cause: Throwable) { + doStop(cause) } finally { isProcessing = false } @@ -114,9 +122,9 @@ public abstract class SimAbstractResourceContext<R : SimResource>( latestFlush = clock.millis() flush(isIntermediate = true) - onFinish() - } catch (e: Throwable) { - onFailure(e) + doStop(null) + } catch (cause: Throwable) { + doStop(cause) } finally { isProcessing = false } @@ -129,7 +137,12 @@ public abstract class SimAbstractResourceContext<R : SimResource>( * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer * will be asked to deliver a new command and is essentially interrupted. */ - public open fun flush(isIntermediate: Boolean = false) { + public fun flush(isIntermediate: Boolean = false) { + // Flush is no-op when the consumer is finished or not yet started + if (state != SimResourceState.Active) { + return + } + val now = clock.millis() // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it. @@ -177,8 +190,8 @@ public abstract class SimAbstractResourceContext<R : SimResource>( // Flush may not be called when the resource consumer has finished throw IllegalStateException() } - } catch (e: Throwable) { - onFailure(e) + } catch (cause: Throwable) { + doStop(cause) } finally { latestFlush = now isProcessing = false @@ -203,6 +216,11 @@ public abstract class SimAbstractResourceContext<R : SimResource>( protected var isProcessing: Boolean = false /** + * A flag to indicate the state of the context. + */ + private var state: SimResourceState = SimResourceState.Pending + + /** * The current command that is being processed. */ private var activeCommand: CommandWrapper? = null @@ -213,6 +231,18 @@ public abstract class SimAbstractResourceContext<R : SimResource>( private var latestFlush: Long = Long.MIN_VALUE /** + * Finish the consumer and resource provider. + */ + private fun doStop(cause: Throwable?) { + val state = state + this.state = SimResourceState.Stopped + + if (state == SimResourceState.Active) { + onFinish(cause) + } + } + + /** * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. */ private fun interpret(command: SimResourceCommand) { @@ -235,9 +265,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>( onConsume(work, limit, deadline) } - is SimResourceCommand.Exit -> { - onFinish() - } + is SimResourceCommand.Exit -> doStop(null) } assert(activeCommand == null) { "Concurrent access to current command" } @@ -248,7 +276,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>( * Request the workload for more work. */ private fun next(remainingWork: Double) { - interpret(consumer.onNext(this, remainingWork)) + interpret(consumer.onNext(this, getCapacity(), remainingWork)) } /** diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt index 77c0a7a9..21f56f9b 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt @@ -29,7 +29,7 @@ public sealed class SimResourceCommand { /** * A request to the resource to perform the specified amount of work before the given [deadline]. * - * @param work The amount of work to process on the CPU. + * @param work The amount of work to process. * @param limit The maximum amount of work to be processed per second. * @param deadline The instant at which the work needs to be fulfilled. */ diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index f516faa6..7637ee69 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -23,23 +23,38 @@ package org.opendc.simulator.resources /** - * A SimResourceConsumer characterizes how a [SimResource] is consumed. + * A [SimResourceConsumer] characterizes how a [SimResource] is consumed. + * + * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently) + * for multiple resource providers, unless explicitly said otherwise. */ public interface SimResourceConsumer<in R : SimResource> { /** - * This method is invoked when the consumer is started for a resource. + * This method is invoked when the consumer is started for some resource. * * @param ctx The execution context in which the consumer runs. - * @return The next command that the resource should perform. */ - public fun onStart(ctx: SimResourceContext<R>): SimResourceCommand + public fun onStart(ctx: SimResourceContext<R>) {} /** - * This method is invoked when a resource was either interrupted or reached its deadline. + * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because + * the resource finished processing, reached its deadline or was interrupted. * * @param ctx The execution context in which the consumer runs. - * @param remainingWork The remaining work that was not yet completed. - * @return The next command that the resource should perform. + * @param capacity The capacity that is available for the consumer. In case no capacity is available, zero is given. + * @param remainingWork The work of the previous command that was not yet completed due to interruption or deadline. + * @return The next command that the resource should execute. + */ + public fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand + + /** + * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], + * the resource finished itself, or a failure occurred at the resource. + * + * Note that throwing an exception in [onStart] or [onNext] is undefined behavior and up to the resource provider. + * + * @param ctx The execution context in which the consumer ran. + * @param cause The cause of the finish in case the resource finished exceptionally. */ - public fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand + public fun onFinish(ctx: SimResourceContext<R>, cause: Throwable? = null) {} } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt index ca23557c..23bf8739 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -22,10 +22,6 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume - /** * A helper class to construct a [SimResourceProvider] which forwards the requests to a [SimResourceConsumer]. */ @@ -37,16 +33,6 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) : private var ctx: SimResourceContext<R>? = null /** - * A flag to indicate that the forwarder is closed. - */ - private var isClosed: Boolean = false - - /** - * The continuation to resume after consumption. - */ - private var cont: Continuation<Unit>? = null - - /** * The delegate [SimResourceConsumer]. */ private var delegate: SimResourceConsumer<R>? = null @@ -61,95 +47,111 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) : */ private var remainingWork: Double = 0.0 - override suspend fun consume(consumer: SimResourceConsumer<R>) { - check(!isClosed) { "Lifecycle of forwarder has ended" } - check(cont == null) { "Run should not be called concurrently" } - - return suspendCancellableCoroutine { cont -> - this.cont = cont - this.delegate = consumer + /** + * The state of the forwarder. + */ + override var state: SimResourceState = SimResourceState.Pending + private set - cont.invokeOnCancellation { reset() } + override fun startConsumer(consumer: SimResourceConsumer<R>) { + check(state == SimResourceState.Pending) { "Resource is in invalid state" } - ctx?.interrupt() - } + state = SimResourceState.Active + delegate = consumer + interrupt() } override fun interrupt() { ctx?.interrupt() } + override fun cancel() { + val delegate = delegate + val ctx = ctx + + state = SimResourceState.Pending + + if (delegate != null && ctx != null) { + this.delegate = null + delegate.onFinish(ctx) + } + } + override fun close() { - isClosed = true - interrupt() - ctx = null + val ctx = ctx + + state = SimResourceState.Stopped + + if (ctx != null) { + this.ctx = null + ctx.interrupt() + } } - override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand { + override fun onStart(ctx: SimResourceContext<R>) { this.ctx = ctx - - return onNext(ctx, 0.0) } - override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand { + val delegate = delegate this.remainingWork = remainingWork - return if (isClosed) { - SimResourceCommand.Exit - } else if (!hasDelegateStarted) { + if (!hasDelegateStarted) { start() + } + + return if (state == SimResourceState.Stopped) { + SimResourceCommand.Exit + } else if (delegate != null) { + val command = delegate.onNext(ctx, capacity, remainingWork) + if (command == SimResourceCommand.Exit) { + // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we + // reset beforehand the existing state and check whether it has been updated afterwards + reset() + + delegate.onFinish(ctx) + + if (state == SimResourceState.Stopped) + SimResourceCommand.Exit + else + onNext(ctx, capacity, 0.0) + } else { + command + } } else { - next() + SimResourceCommand.Idle() } } - /** - * Start the delegate. - */ - private fun start(): SimResourceCommand { - val delegate = delegate ?: return SimResourceCommand.Idle() - val command = delegate.onStart(checkNotNull(ctx)) - - hasDelegateStarted = true - - return forward(command) - } + override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) { + this.ctx = null - /** - * Obtain the next command to process. - */ - private fun next(): SimResourceCommand { val delegate = delegate - return forward(delegate?.onNext(checkNotNull(ctx), remainingWork) ?: SimResourceCommand.Idle()) + if (delegate != null) { + reset() + delegate.onFinish(ctx, cause) + } } /** - * Forward the specified [command]. + * Start the delegate. */ - private fun forward(command: SimResourceCommand): SimResourceCommand { - return if (command == SimResourceCommand.Exit) { - val cont = checkNotNull(cont) - - // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we - // reset beforehand the existing state and check whether it has been updated afterwards - reset() - cont.resume(Unit) + private fun start() { + val delegate = delegate ?: return + delegate.onStart(checkNotNull(ctx)) - if (isClosed) - SimResourceCommand.Exit - else - start() - } else { - command - } + hasDelegateStarted = true } /** * Reset the delegate. */ private fun reset() { - cont = null delegate = null hasDelegateStarted = false + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index e35aa683..1593281b 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import kotlinx.coroutines.suspendCancellableCoroutine + /** * A [SimResourceProvider] provides some resource of type [R]. */ @@ -32,19 +34,51 @@ public interface SimResourceProvider<out R : SimResource> : AutoCloseable { public val resource: R /** - * Consume the resource provided by this provider using the specified [consumer]. + * The state of the resource. + */ + public val state: SimResourceState + + /** + * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously. + * + * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended. */ - public suspend fun consume(consumer: SimResourceConsumer<R>) + public fun startConsumer(consumer: SimResourceConsumer<R>) /** - * Interrupt the resource. + * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op. */ public fun interrupt() /** + * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op. + */ + public fun cancel() + + /** * End the lifetime of the resource. * * This operation terminates the existing resource consumer. */ public override fun close() } + +/** + * Consume the resource provided by this provider using the specified [consumer] and suspend execution until + * the consumer has finished. + */ +public suspend fun <R : SimResource> SimResourceProvider<R>.consume(consumer: SimResourceConsumer<R>) { + return suspendCancellableCoroutine { cont -> + startConsumer(object : SimResourceConsumer<R> by consumer { + override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) { + assert(!cont.isCompleted) { "Coroutine already completed" } + + cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit)) + + consumer.onFinish(ctx, cause) + } + + override fun toString(): String = "SimSuspendingResourceConsumer" + }) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 540a17c9..a3b16177 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -27,9 +27,6 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import org.opendc.utils.TimerScheduler import java.time.Clock -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException import kotlin.math.min /** @@ -37,6 +34,7 @@ import kotlin.math.min * * @param resource The resource to provide. * @param clock The virtual clock to track simulation time. + * @param scheduler The scheduler to schedule the interrupts. */ public class SimResourceSource<R : SimResource>( override val resource: R, @@ -51,60 +49,48 @@ public class SimResourceSource<R : SimResource>( private val _speed = MutableStateFlow(0.0) /** - * A flag to indicate that the resource was closed. - */ - private var isClosed: Boolean = false - - /** - * The current active consumer. - */ - private var cont: CancellableContinuation<Unit>? = null - - /** * The [Context] that is currently running. */ private var ctx: Context? = null - override suspend fun consume(consumer: SimResourceConsumer<R>) { - check(!isClosed) { "Lifetime of resource has ended." } - check(cont == null) { "Run should not be called concurrently" } + override var state: SimResourceState = SimResourceState.Pending + private set - try { - return suspendCancellableCoroutine { cont -> - val ctx = Context(consumer, cont) + override fun startConsumer(consumer: SimResourceConsumer<R>) { + check(state == SimResourceState.Pending) { "Resource is in invalid state" } + val ctx = Context(consumer) - this.cont = cont - this.ctx = ctx + this.ctx = ctx + this.state = SimResourceState.Active - ctx.start() - cont.invokeOnCancellation { - ctx.stop() - } - } - } finally { - cont = null - ctx = null - } + ctx.start() } override fun close() { - isClosed = true - cont?.cancel() - cont = null - ctx = null + cancel() + state = SimResourceState.Stopped } override fun interrupt() { ctx?.interrupt() } + override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.stop() + } + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } + /** * Internal implementation of [SimResourceContext] for this class. */ - private inner class Context( - consumer: SimResourceConsumer<R>, - val cont: Continuation<Unit> - ) : SimAbstractResourceContext<R>(resource, clock, consumer) { + private inner class Context(consumer: SimResourceConsumer<R>) : SimAbstractResourceContext<R>(resource, clock, consumer) { /** * The processing speed of the resource. */ @@ -130,16 +116,12 @@ public class SimResourceSource<R : SimResource>( scheduler.startSingleTimerTo(this, until, ::flush) } - override fun onFinish() { + override fun onFinish(cause: Throwable?) { speed = 0.0 scheduler.cancel(this) - cont.resume(Unit) - } + cancel() - override fun onFailure(cause: Throwable) { - speed = 0.0 - scheduler.cancel(this) - cont.resumeWithException(cause) + super.onFinish(cause) } override fun toString(): String = "SimResourceSource.Context[resource=$resource]" diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt new file mode 100644 index 00000000..c72951d0 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * The state of a resource provider. + */ +public enum class SimResourceState { + /** + * The resource provider is pending and the resource is waiting to be consumed. + */ + Pending, + + /** + * The resource provider is active and the resource is currently being consumed. + */ + Active, + + /** + * The resource provider is stopped and the resource cannot be consumed anymore. + */ + Stopped +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 060d0ea2..5eea78f6 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -22,33 +22,30 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel -import kotlinx.coroutines.launch import java.util.ArrayDeque -import kotlin.coroutines.CoroutineContext /** * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that * a single output is directly connected to an input and that the switch can only support as much outputs as inputs. */ -public class SimResourceSwitchExclusive<R : SimResource>(context: CoroutineContext) : SimResourceSwitch<R> { +public class SimResourceSwitchExclusive<R : SimResource> : SimResourceSwitch<R> { /** - * The [CoroutineScope] of the service bounded by the lifecycle of the service. + * A flag to indicate that the switch is closed. */ - private val scope = CoroutineScope(context + Job()) + private var isClosed: Boolean = false - private val _outputs = mutableSetOf<SimResourceProvider<R>>() + private val _outputs = mutableSetOf<Provider>() override val outputs: Set<SimResourceProvider<R>> get() = _outputs private val availableResources = ArrayDeque<SimResourceForwarder<R>>() + private val _inputs = mutableSetOf<SimResourceProvider<R>>() override val inputs: Set<SimResourceProvider<R>> get() = _inputs override fun addOutput(resource: R): SimResourceProvider<R> { + check(!isClosed) { "Switch has been closed" } check(availableResources.isNotEmpty()) { "No capacity to serve request" } val forwarder = availableResources.poll() val output = Provider(resource, forwarder) @@ -57,33 +54,37 @@ public class SimResourceSwitchExclusive<R : SimResource>(context: CoroutineConte } override fun addInput(input: SimResourceProvider<R>) { + check(!isClosed) { "Switch has been closed" } + if (input in inputs) { return } val forwarder = SimResourceForwarder(input.resource) - scope.launch { input.consume(forwarder) } - _inputs += input availableResources += forwarder + + input.startConsumer(object : SimResourceConsumer<R> by forwarder { + override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) { + // De-register the input after it has finished + _inputs -= input + forwarder.onFinish(ctx, cause) + } + }) } override fun close() { - scope.cancel() + isClosed = true + + // Cancel all upstream subscriptions + _inputs.forEach(SimResourceProvider<R>::cancel) } private inner class Provider( override val resource: R, private val forwarder: SimResourceForwarder<R> - ) : SimResourceProvider<R> { - - override suspend fun consume(consumer: SimResourceConsumer<R>) = forwarder.consume(consumer) - - override fun interrupt() { - forwarder.interrupt() - } - + ) : SimResourceProvider<R> by forwarder { override fun close() { _outputs -= this availableResources += forwarder diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index bcf76d3c..6b919a77 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -25,10 +25,6 @@ package org.opendc.simulator.resources import kotlinx.coroutines.* import org.opendc.simulator.resources.consumer.SimConsumerBarrier import java.time.Clock -import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -39,14 +35,8 @@ import kotlin.math.min */ public class SimResourceSwitchMaxMin<R : SimResource>( private val clock: Clock, - context: CoroutineContext, private val listener: Listener<R>? = null ) : SimResourceSwitch<R> { - /** - * The [CoroutineScope] of the service bounded by the lifecycle of the service. - */ - private val scope = CoroutineScope(context + Job()) - private val inputConsumers = mutableSetOf<InputConsumer>() private val _outputs = mutableSetOf<OutputProvider>() override val outputs: Set<SimResourceProvider<R>> @@ -112,9 +102,16 @@ public class SimResourceSwitchMaxMin<R : SimResource>( private var barrier: SimConsumerBarrier = SimConsumerBarrier(0) /** + * A flag to indicate that the switch is closed. + */ + private var isClosed: Boolean = false + + /** * Add an output to the switch represented by [resource]. */ override fun addOutput(resource: R): SimResourceProvider<R> { + check(!isClosed) { "Switch has been closed" } + val provider = OutputProvider(resource) _outputs.add(provider) return provider @@ -124,13 +121,15 @@ public class SimResourceSwitchMaxMin<R : SimResource>( * Add the specified [input] to the switch. */ override fun addInput(input: SimResourceProvider<R>) { + check(!isClosed) { "Switch has been closed" } + val consumer = InputConsumer(input) _inputs.add(input) inputConsumers += consumer } override fun close() { - scope.cancel() + isClosed = true } /** @@ -297,64 +296,56 @@ public class SimResourceSwitchMaxMin<R : SimResource>( */ private inner class OutputProvider(override val resource: R) : SimResourceProvider<R> { /** - * A flag to indicate that the resource was closed. - */ - private var isClosed: Boolean = false - - /** - * The current active consumer. - */ - private var cont: CancellableContinuation<Unit>? = null - - /** * The [OutputContext] that is currently running. */ private var ctx: OutputContext? = null - override suspend fun consume(consumer: SimResourceConsumer<R>) { - check(!isClosed) { "Lifetime of resource has ended." } - check(cont == null) { "Run should not be called concurrently" } + override var state: SimResourceState = SimResourceState.Pending + internal set - try { - return suspendCancellableCoroutine { cont -> - val ctx = OutputContext(resource, consumer, cont) - ctx.start() - cont.invokeOnCancellation { - ctx.stop() - } + override fun startConsumer(consumer: SimResourceConsumer<R>) { + check(state == SimResourceState.Pending) { "Resource cannot be consumed" } - this.cont = cont - this.ctx = ctx + val ctx = OutputContext(this, resource, consumer) + this.ctx = ctx + this.state = SimResourceState.Active + outputContexts += ctx - outputContexts += ctx - schedule() - } - } finally { - cont = null - ctx = null - } + ctx.start() + schedule() } override fun close() { - isClosed = true - cont?.cancel() - cont = null - ctx = null + cancel() + + state = SimResourceState.Stopped _outputs.remove(this) } override fun interrupt() { ctx?.interrupt() } + + override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.stop() + } + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } } /** * A [SimAbstractResourceContext] for the output resources. */ private inner class OutputContext( + private val provider: OutputProvider, resource: R, - consumer: SimResourceConsumer<R>, - private val cont: Continuation<Unit> + consumer: SimResourceConsumer<R> ) : SimAbstractResourceContext<R>(resource, clock, consumer), Comparable<OutputContext> { /** * The current command that is processed by the vCPU. @@ -371,11 +362,6 @@ public class SimResourceSwitchMaxMin<R : SimResource>( */ var actualSpeed: Double = 0.0 - /** - * A flag to indicate that the CPU has exited. - */ - var hasExited: Boolean = false - override fun onIdle(deadline: Long) { allowedSpeed = 0.0 activeCommand = SimResourceCommand.Idle(deadline) @@ -386,16 +372,11 @@ public class SimResourceSwitchMaxMin<R : SimResource>( activeCommand = SimResourceCommand.Consume(work, limit, deadline) } - override fun onFinish() { - hasExited = true + override fun onFinish(cause: Throwable?) { activeCommand = SimResourceCommand.Exit - cont.resume(Unit) - } + provider.cancel() - override fun onFailure(cause: Throwable) { - hasExited = true - activeCommand = SimResourceCommand.Exit - cont.resumeWithException(cause) + super.onFinish(cause) } override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { @@ -453,21 +434,8 @@ public class SimResourceSwitchMaxMin<R : SimResource>( private lateinit var ctx: SimResourceContext<R> init { - scope.launch { - try { - barrier = SimConsumerBarrier(barrier.parties + 1) - input.consume(this@InputConsumer) - } catch (e: CancellationException) { - // Cancel gracefully - throw e - } catch (e: Throwable) { - e.printStackTrace() - } finally { - barrier = SimConsumerBarrier(barrier.parties - 1) - inputConsumers -= this@InputConsumer - _inputs -= input - } - } + barrier = SimConsumerBarrier(barrier.parties + 1) + input.startConsumer(this@InputConsumer) } /** @@ -477,12 +445,11 @@ public class SimResourceSwitchMaxMin<R : SimResource>( ctx.interrupt() } - override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand { + override fun onStart(ctx: SimResourceContext<R>) { this.ctx = ctx - return commands[ctx.resource] ?: SimResourceCommand.Idle() } - override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand { totalRemainingWork += remainingWork val isLast = barrier.enter() @@ -504,5 +471,13 @@ public class SimResourceSwitchMaxMin<R : SimResource>( commands[ctx.resource] ?: SimResourceCommand.Idle() } } + + override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) { + barrier = SimConsumerBarrier(barrier.parties - 1) + inputConsumers -= this@InputConsumer + _inputs -= input + + super.onFinish(ctx, cause) + } } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt index 03a3cebd..a00ee575 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -31,14 +31,16 @@ import org.opendc.simulator.resources.SimResourceContext * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource * consumption for some period of time. */ -public class SimTraceConsumer(trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> { - private val iterator = trace.iterator() +public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> { + private var iterator: Iterator<Fragment>? = null - override fun onStart(ctx: SimResourceContext<SimResource>): SimResourceCommand { - return onNext(ctx, 0.0) + override fun onStart(ctx: SimResourceContext<SimResource>) { + check(iterator == null) { "Consumer already running" } + iterator = trace.iterator() } - override fun onNext(ctx: SimResourceContext<SimResource>, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext<SimResource>, capacity: Double, remainingWork: Double): SimResourceCommand { + val iterator = checkNotNull(iterator) return if (iterator.hasNext()) { val now = ctx.clock.millis() val fragment = iterator.next() @@ -56,6 +58,10 @@ public class SimTraceConsumer(trace: Sequence<Fragment>) : SimResourceConsumer<S } } + override fun onFinish(ctx: SimResourceContext<SimResource>, cause: Throwable?) { + iterator = null + } + /** * A fragment of the workload. */ diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt new file mode 100644 index 00000000..bad2f403 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.consumer + +import org.opendc.simulator.resources.SimResource +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext + +/** + * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization. + */ +public class SimWorkConsumer<R : SimResource>( + private val work: Double, + private val utilization: Double +) : SimResourceConsumer<R> { + + init { + require(work >= 0.0) { "Work must be positive" } + require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } + } + + private var limit = 0.0 + private var remainingWork: Double = 0.0 + + override fun onStart(ctx: SimResourceContext<R>) { + limit = ctx.resource.capacity * utilization + remainingWork = work + } + + override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand { + val work = this.remainingWork + remainingWork + this.remainingWork -= work + return if (work > 0.0) { + SimResourceCommand.Consume(work, limit) + } else { + SimResourceCommand.Exit + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index e7642dc1..0bc87473 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -22,11 +22,10 @@ package org.opendc.simulator.resources +import io.mockk.* import kotlinx.coroutines.* import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.* -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue import org.opendc.simulator.utils.DelayControllerClockAdapter /** @@ -45,28 +44,15 @@ class SimResourceContextTest { val resource = SimCpu(4200.0) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(10.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { - override fun onIdle(deadline: Long) { - } + override fun onIdle(deadline: Long) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) { - } + override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish() { - } - - override fun onFailure(cause: Throwable) { - } + override fun onFinish(cause: Throwable?) {} } context.flush() @@ -77,36 +63,20 @@ class SimResourceContextTest { val clock = DelayControllerClockAdapter(this) val resource = SimCpu(4200.0) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(10.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } - - var counter = 0 - val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { - override fun onIdle(deadline: Long) { - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - counter++ - } + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - override fun onFinish() { - } - - override fun onFailure(cause: Throwable) { - } - } + val context = spyk(object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { + override fun onIdle(deadline: Long) {} + override fun onFinish(cause: Throwable?) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + }) context.start() delay(1) // Delay 1 ms to prevent hitting the fast path context.flush(isIntermediate = true) - assertEquals(2, counter) + + verify(exactly = 2) { context.onConsume(any(), any(), any()) } } @Test @@ -114,33 +84,14 @@ class SimResourceContextTest { val clock = DelayControllerClockAdapter(this) val resource = SimCpu(4200.0) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Idle(10) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } - - var counter = 0 - var isFinished = false - val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { - override fun onIdle(deadline: Long) { - counter++ - } + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - override fun onConsume(work: Double, limit: Double, deadline: Long) { - } - - override fun onFinish() { - isFinished = true - } - - override fun onFailure(cause: Throwable) { - } - } + val context = spyk(object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { + override fun onIdle(deadline: Long) {} + override fun onFinish(cause: Throwable?) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + }) context.start() delay(5) @@ -149,8 +100,26 @@ class SimResourceContextTest { context.flush(isIntermediate = true) assertAll( - { assertEquals(1, counter) }, - { assertTrue(isFinished) } + { verify(exactly = 1) { context.onIdle(any()) } }, + { verify(exactly = 1) { context.onFinish(null) } } ) } + + @Test + fun testDoubleStart() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val resource = SimCpu(4200.0) + + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit + + val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { + override fun onIdle(deadline: Long) {} + override fun onFinish(cause: Throwable?) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + } + + context.start() + assertThrows<IllegalStateException> { context.start() } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt index ced1bd98..b1b959ba 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt @@ -22,10 +22,16 @@ package org.opendc.simulator.resources +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch import kotlinx.coroutines.test.runBlockingTest +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -53,14 +59,15 @@ internal class SimResourceForwarderTest { } forwarder.consume(object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Exit - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { + override fun onNext( + ctx: SimResourceContext<SimCpu>, + capacity: Double, + remainingWork: Double + ): SimResourceCommand { return SimResourceCommand.Exit } }) + forwarder.close() scheduler.close() } @@ -78,15 +85,100 @@ internal class SimResourceForwarderTest { } forwarder.consume(object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } + var isFirst = true - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit + override fun onNext( + ctx: SimResourceContext<SimCpu>, + capacity: Double, + remainingWork: Double + ): SimResourceCommand { + return if (isFirst) { + isFirst = false + SimResourceCommand.Consume(10.0, 1.0) + } else { + SimResourceCommand.Exit + } } }) forwarder.close() } + + @Test + fun testState() = runBlockingTest { + val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val consumer = object : SimResourceConsumer<SimCpu> { + override fun onNext( + ctx: SimResourceContext<SimCpu>, + capacity: Double, + remainingWork: Double + ): SimResourceCommand = SimResourceCommand.Exit + } + + assertEquals(SimResourceState.Pending, forwarder.state) + + forwarder.startConsumer(consumer) + assertEquals(SimResourceState.Active, forwarder.state) + + assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) } + + forwarder.cancel() + assertEquals(SimResourceState.Pending, forwarder.state) + + forwarder.close() + assertEquals(SimResourceState.Stopped, forwarder.state) + } + + @Test + fun testCancelPendingDelegate() = runBlockingTest { + val forwarder = SimResourceForwarder(SimCpu(1000.0)) + + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit + + forwarder.startConsumer(consumer) + forwarder.cancel() + + verify(exactly = 0) { consumer.onFinish(any(), null) } + } + + @Test + fun testCancelStartedDelegate() = runBlockingTest { + val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) + + source.startConsumer(forwarder) + yield() + forwarder.startConsumer(consumer) + yield() + forwarder.cancel() + + verify(exactly = 1) { consumer.onStart(any()) } + verify(exactly = 1) { consumer.onFinish(any(), null) } + } + + @Test + fun testCancelPropagation() = runBlockingTest { + val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) + + source.startConsumer(forwarder) + yield() + forwarder.startConsumer(consumer) + yield() + source.cancel() + + verify(exactly = 1) { consumer.onStart(any()) } + verify(exactly = 1) { consumer.onFinish(any(), null) } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 4f7825fc..18f18ded 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import io.mockk.every +import io.mockk.mockk import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runBlockingTest @@ -46,15 +48,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1000 * ctx.resource.speed, ctx.resource.speed) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1000 * provider.resource.speed, provider.resource.speed)) + .andThen(SimResourceCommand.Exit) try { val res = mutableListOf<Double>() @@ -76,15 +73,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1000 * ctx.resource.speed, 2 * ctx.resource.speed) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1000 * provider.resource.speed, 2 * provider.resource.speed)) + .andThen(SimResourceCommand.Exit) try { val res = mutableListOf<Double>() @@ -111,13 +103,12 @@ class SimResourceSourceTest { val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + override fun onStart(ctx: SimResourceContext<SimCpu>) { ctx.interrupt() - return SimResourceCommand.Exit } - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() + override fun onNext(ctx: SimResourceContext<SimCpu>, capacity: Double, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit } } @@ -137,14 +128,19 @@ class SimResourceSourceTest { lateinit var resCtx: SimResourceContext<SimCpu> val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + var isFirst = true + override fun onStart(ctx: SimResourceContext<SimCpu>) { resCtx = ctx - return SimResourceCommand.Consume(4.0, 1.0) } - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext<SimCpu>, capacity: Double, remainingWork: Double): SimResourceCommand { assertEquals(0.0, remainingWork) - return SimResourceCommand.Exit + return if (isFirst) { + isFirst = false + SimResourceCommand.Consume(4.0, 1.0) + } else { + SimResourceCommand.Exit + } } } @@ -168,15 +164,9 @@ class SimResourceSourceTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - throw IllegalStateException() - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onStart(any()) } + .throws(IllegalStateException()) try { assertThrows<IllegalStateException> { @@ -194,15 +184,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) try { assertThrows<IllegalStateException> { @@ -220,15 +205,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) try { assertThrows<IllegalStateException> { @@ -249,15 +229,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) try { assertThrows<IllegalStateException> { @@ -276,15 +251,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) try { launch { provider.consume(consumer) } @@ -304,15 +274,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Idle(ctx.clock.millis() + 500) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Idle(clock.millis() + 500)) + .andThen(SimResourceCommand.Exit) try { provider.consume(consumer) @@ -332,15 +297,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Idle() - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Idle()) + .andThenThrows(IllegalStateException()) try { provider.consume(consumer) @@ -351,4 +311,25 @@ class SimResourceSourceTest { } } } + + @Test + fun testIncorrectDeadline() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Idle(2)) + .andThen(SimResourceCommand.Exit) + + try { + delay(10) + + assertThrows<IllegalArgumentException> { provider.consume(consumer) } + } finally { + scheduler.close() + provider.close() + } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index ca6558bf..354dab93 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import io.mockk.every +import io.mockk.mockk import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch @@ -34,7 +36,6 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.resources.consumer.SimTraceConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler -import java.lang.IllegalStateException /** * Test suite for the [SimResourceSwitchExclusive] class. @@ -67,7 +68,7 @@ internal class SimResourceSwitchExclusiveTest { ), ) - val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext) + val switch = SimResourceSwitchExclusive<SimCpu>() val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) switch.addInput(source) @@ -98,17 +99,10 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(duration / 1000.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val workload = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit - val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext) + val switch = SimResourceSwitchExclusive<SimCpu>() val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) switch.addInput(source) @@ -134,16 +128,27 @@ internal class SimResourceSwitchExclusiveTest { val duration = 5 * 60L * 1000 val workload = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(duration / 1000.0, 1.0) + var isFirst = true + + override fun onStart(ctx: SimResourceContext<SimCpu>) { + isFirst = true } - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit + override fun onNext( + ctx: SimResourceContext<SimCpu>, + capacity: Double, + remainingWork: Double + ): SimResourceCommand { + return if (isFirst) { + isFirst = false + SimResourceCommand.Consume(duration / 1000.0, 1.0) + } else { + SimResourceCommand.Exit + } } } - val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext) + val switch = SimResourceSwitchExclusive<SimCpu>() val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) switch.addInput(source) @@ -169,17 +174,10 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(duration.toDouble(), 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val workload = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit - val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext) + val switch = SimResourceSwitchExclusive<SimCpu>() val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) switch.addInput(source) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index 698c1700..e8f5a13c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import io.mockk.every +import io.mockk.mockk import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -47,22 +49,15 @@ internal class SimResourceSwitchMaxMinTest { fun testSmoke() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val switch = SimResourceSwitchMaxMin<SimCpu>(clock, coroutineContext) + val switch = SimResourceSwitchMaxMin<SimCpu>(clock) val sources = List(2) { SimResourceSource(SimCpu(2000.0), clock, scheduler) } sources.forEach { switch.addInput(it) } val provider = switch.addOutput(SimCpu(1000.0)) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit try { provider.consume(consumer) @@ -112,7 +107,7 @@ internal class SimResourceSwitchMaxMinTest { ), ) - val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener) + val switch = SimResourceSwitchMaxMin(clock, listener) val provider = switch.addOutput(SimCpu(3200.0)) try { @@ -180,7 +175,7 @@ internal class SimResourceSwitchMaxMinTest { ) ) - val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener) + val switch = SimResourceSwitchMaxMin(clock, listener) val providerA = switch.addOutput(SimCpu(3200.0)) val providerB = switch.addOutput(SimCpu(3200.0)) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt new file mode 100644 index 00000000..b05195f7 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler + +/** + * A test suite for the [SimWorkConsumer] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimWorkConsumerTest { + data class SimCpu(val speed: Double) : SimResource { + override val capacity: Double + get() = speed + } + + @Test + fun testSmoke() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(1.0), clock, scheduler) + + val consumer = SimWorkConsumer<SimCpu>(1.0, 1.0) + + try { + provider.consume(consumer) + assertEquals(1000, currentTime) + } finally { + provider.close() + } + } + + @Test + fun testUtilization() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(1.0), clock, scheduler) + + val consumer = SimWorkConsumer<SimCpu>(1.0, 0.5) + + try { + provider.consume(consumer) + assertEquals(2000, currentTime) + } finally { + provider.close() + } + } +} diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt index 49964938..d4bc7b5c 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt @@ -93,7 +93,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo try { timer() } catch (e: Throwable) { - Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e) + coroutineContext[CoroutineExceptionHandler]?.handleException(coroutineContext, e) } } } |
