From 9dab4d7b3921cd48199d773c7dc4bae0f2273223 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 18 Mar 2021 16:25:00 +0100 Subject: simulator: Re-design consumer interface to support capacity negotiation This change re-designs the SimResourceConsumer interface to support in the future capacity negotiation. This basically means that the consumer will be informed directly when not enough capacity is available, instead of after the deadline specified by the consumer. --- .../org/opendc/compute/simulator/SimHostTest.kt | 4 +- .../experiments/capelin/CapelinIntegrationTest.kt | 8 +- .../simulator/compute/SimAbstractHypervisor.kt | 3 +- .../opendc/simulator/compute/SimAbstractMachine.kt | 3 +- .../simulator/compute/SimFairShareHypervisor.kt | 2 - .../simulator/compute/SimSpaceSharedHypervisor.kt | 3 +- .../simulator/compute/workload/SimFlopsWorkload.kt | 27 +--- .../compute/workload/SimRuntimeWorkload.kt | 23 +-- .../simulator/compute/workload/SimTraceWorkload.kt | 6 +- .../resources/SimAbstractResourceContext.kt | 74 +++++++--- .../simulator/resources/SimResourceCommand.kt | 2 +- .../simulator/resources/SimResourceConsumer.kt | 31 ++-- .../simulator/resources/SimResourceForwarder.kt | 138 +++++++++--------- .../simulator/resources/SimResourceProvider.kt | 40 +++++- .../simulator/resources/SimResourceSource.kt | 72 ++++------ .../opendc/simulator/resources/SimResourceState.kt | 43 ++++++ .../resources/SimResourceSwitchExclusive.kt | 41 +++--- .../simulator/resources/SimResourceSwitchMaxMin.kt | 129 +++++++---------- .../resources/consumer/SimTraceConsumer.kt | 16 ++- .../resources/consumer/SimWorkConsumer.kt | 60 ++++++++ .../simulator/resources/SimResourceContextTest.kt | 115 ++++++--------- .../resources/SimResourceForwarderTest.kt | 112 +++++++++++++-- .../simulator/resources/SimResourceSourceTest.kt | 159 +++++++++------------ .../resources/SimResourceSwitchExclusiveTest.kt | 52 ++++--- .../resources/SimResourceSwitchMaxMinTest.kt | 19 +-- .../simulator/resources/SimWorkConsumerTest.kt | 74 ++++++++++ .../main/kotlin/org/opendc/utils/TimerScheduler.kt | 2 +- 27 files changed, 733 insertions(+), 525 deletions(-) create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 6929b06c..c4bd0cb4 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -136,8 +136,8 @@ internal class SimHostTest { assertAll( { assertEquals(emptyList(), scope.uncaughtExceptions, "No errors") }, - { assertEquals(4273200, requestedWork, "Requested work does not match") }, - { assertEquals(3133200, grantedWork, "Granted work does not match") }, + { assertEquals(4281600, requestedWork, "Requested work does not match") }, + { assertEquals(3141600, grantedWork, "Granted work does not match") }, { assertEquals(1140000, overcommittedWork, "Overcommitted work does not match") }, { assertEquals(1200006, scope.currentTime) } ) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 59ce895f..7dae53be 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -143,8 +143,8 @@ class CapelinIntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1707132711051, monitor.totalRequestedBurst) }, - { assertEquals(457881474296, monitor.totalGrantedBurst) }, + { assertEquals(1707144601723, monitor.totalRequestedBurst) }, + { assertEquals(457893364971, monitor.totalGrantedBurst) }, { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) @@ -189,8 +189,8 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(711464322955, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(175226276978, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(711464339925, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(175226293948, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt index a99b082a..281d43ae 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -32,7 +32,6 @@ import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* import java.time.Clock -import kotlin.coroutines.CoroutineContext /** * Abstract implementation of the [SimHypervisor] interface. @@ -121,7 +120,7 @@ public abstract class SimAbstractHypervisor : SimHypervisor { override val clock: Clock get() = this@SimAbstractHypervisor.context.clock - override val meta: Map = meta + mapOf("coroutine-context" to context.meta["coroutine-context"] as CoroutineContext) + override val meta: Map = meta override fun interrupt(resource: SimResource) { requireNotNull(this@VirtualMachine.cpus[resource]).interrupt() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 39ae34fe..44906c2b 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -33,6 +33,7 @@ import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.SimResource import org.opendc.simulator.resources.SimResourceProvider import org.opendc.simulator.resources.SimResourceSource +import org.opendc.simulator.resources.consume import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -91,7 +92,7 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine override suspend fun run(workload: SimWorkload, meta: Map): Unit = withContext(context) { val resources = resources require(!isTerminated) { "Machine is terminated" } - val ctx = Context(resources, meta + mapOf("coroutine-context" to context)) + val ctx = Context(resources, meta) val totalCapacity = model.cpus.sumByDouble { it.frequency } _speed = MutableList(model.cpus.size) { 0.0 } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index bb97192d..c629fbd9 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.compute import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* -import kotlin.coroutines.CoroutineContext /** * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single @@ -40,7 +39,6 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { return SimResourceSwitchMaxMin( ctx.clock, - ctx.meta["coroutine-context"] as CoroutineContext, object : SimResourceSwitchMaxMin.Listener { override fun onSliceFinish( switch: SimResourceSwitchMaxMin, diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index 2001a230..5de69884 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.compute import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.resources.* -import kotlin.coroutines.CoroutineContext /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. @@ -35,6 +34,6 @@ public class SimSpaceSharedHypervisor : SimAbstractHypervisor() { } override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { - return SimResourceSwitchExclusive(ctx.meta["coroutine-context"] as CoroutineContext) + return SimResourceSwitchExclusive() } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index 9b47821e..f1079ee6 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -24,9 +24,8 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.model.SimProcessingUnit -import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.resources.consumer.SimWorkConsumer /** * A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on @@ -47,29 +46,7 @@ public class SimFlopsWorkload( override fun onStart(ctx: SimMachineContext) {} override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer { - return CpuConsumer(ctx) - } - - private inner class CpuConsumer(private val machine: SimMachineContext) : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - val limit = ctx.resource.frequency * utilization - val work = flops.toDouble() / machine.cpus.size - - return if (work > 0.0) { - SimResourceCommand.Consume(work, limit) - } else { - SimResourceCommand.Exit - } - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - return if (remainingWork > 0.0) { - val limit = ctx.resource.frequency * utilization - return SimResourceCommand.Consume(remainingWork, limit) - } else { - SimResourceCommand.Exit - } - } + return SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization) } override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)" diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index 313b6ed5..d7aa8f80 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -24,9 +24,8 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.model.SimProcessingUnit -import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.resources.consumer.SimWorkConsumer /** * A [SimWorkload] that models application execution as a single duration. @@ -46,24 +45,8 @@ public class SimRuntimeWorkload( override fun onStart(ctx: SimMachineContext) {} override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer { - return CpuConsumer() - } - - private inner class CpuConsumer : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - val limit = ctx.resource.frequency * utilization - val work = (limit / 1000) * duration - return SimResourceCommand.Consume(work, limit) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - return if (remainingWork > 0.0) { - val limit = ctx.resource.frequency * utilization - SimResourceCommand.Consume(remainingWork, limit) - } else { - SimResourceCommand.Exit - } - } + val limit = cpu.frequency * utilization + return SimWorkConsumer((limit / 1000) * duration, utilization) } override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)" diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 31f58a0f..cc4f3136 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -50,11 +50,7 @@ public class SimTraceWorkload(public val trace: Sequence) : SimWorkloa } private inner class CpuConsumer : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return onNext(ctx, 0.0) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand { val now = ctx.clock.millis() val fragment = fragment ?: return SimResourceCommand.Exit val work = (fragment.duration / 1000) * fragment.usage diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index 52251bff..431ca625 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -49,12 +49,9 @@ public abstract class SimAbstractResourceContext( /** * This method is invoked when the resource consumer has finished. */ - public abstract fun onFinish() - - /** - * This method is invoked when the resource consumer throws an exception. - */ - public abstract fun onFailure(cause: Throwable) + public open fun onFinish(cause: Throwable?) { + consumer.onFinish(this, cause) + } /** * Compute the duration that a resource consumption will take with the specified [speed]. @@ -67,7 +64,14 @@ public abstract class SimAbstractResourceContext( * Compute the speed at which the resource may be consumed. */ protected open fun getSpeed(limit: Double): Double { - return min(limit, resource.capacity) + return min(limit, getCapacity()) + } + + /** + * Return the capacity available for the resource consumer. + */ + protected open fun getCapacity(): Double { + return resource.capacity } /** @@ -93,13 +97,17 @@ public abstract class SimAbstractResourceContext( * Start the consumer. */ public fun start() { - try { - isProcessing = true - latestFlush = clock.millis() + check(state == SimResourceState.Pending) { "Consumer is already started" } + + state = SimResourceState.Active + isProcessing = true + latestFlush = clock.millis() - interpret(consumer.onStart(this)) - } catch (e: Throwable) { - onFailure(e) + try { + consumer.onStart(this) + interpret(consumer.onNext(this, getCapacity(), 0.0)) + } catch (cause: Throwable) { + doStop(cause) } finally { isProcessing = false } @@ -114,9 +122,9 @@ public abstract class SimAbstractResourceContext( latestFlush = clock.millis() flush(isIntermediate = true) - onFinish() - } catch (e: Throwable) { - onFailure(e) + doStop(null) + } catch (cause: Throwable) { + doStop(cause) } finally { isProcessing = false } @@ -129,7 +137,12 @@ public abstract class SimAbstractResourceContext( * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer * will be asked to deliver a new command and is essentially interrupted. */ - public open fun flush(isIntermediate: Boolean = false) { + public fun flush(isIntermediate: Boolean = false) { + // Flush is no-op when the consumer is finished or not yet started + if (state != SimResourceState.Active) { + return + } + val now = clock.millis() // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it. @@ -177,8 +190,8 @@ public abstract class SimAbstractResourceContext( // Flush may not be called when the resource consumer has finished throw IllegalStateException() } - } catch (e: Throwable) { - onFailure(e) + } catch (cause: Throwable) { + doStop(cause) } finally { latestFlush = now isProcessing = false @@ -202,6 +215,11 @@ public abstract class SimAbstractResourceContext( */ protected var isProcessing: Boolean = false + /** + * A flag to indicate the state of the context. + */ + private var state: SimResourceState = SimResourceState.Pending + /** * The current command that is being processed. */ @@ -212,6 +230,18 @@ public abstract class SimAbstractResourceContext( */ private var latestFlush: Long = Long.MIN_VALUE + /** + * Finish the consumer and resource provider. + */ + private fun doStop(cause: Throwable?) { + val state = state + this.state = SimResourceState.Stopped + + if (state == SimResourceState.Active) { + onFinish(cause) + } + } + /** * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. */ @@ -235,9 +265,7 @@ public abstract class SimAbstractResourceContext( onConsume(work, limit, deadline) } - is SimResourceCommand.Exit -> { - onFinish() - } + is SimResourceCommand.Exit -> doStop(null) } assert(activeCommand == null) { "Concurrent access to current command" } @@ -248,7 +276,7 @@ public abstract class SimAbstractResourceContext( * Request the workload for more work. */ private fun next(remainingWork: Double) { - interpret(consumer.onNext(this, remainingWork)) + interpret(consumer.onNext(this, getCapacity(), remainingWork)) } /** diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt index 77c0a7a9..21f56f9b 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt @@ -29,7 +29,7 @@ public sealed class SimResourceCommand { /** * A request to the resource to perform the specified amount of work before the given [deadline]. * - * @param work The amount of work to process on the CPU. + * @param work The amount of work to process. * @param limit The maximum amount of work to be processed per second. * @param deadline The instant at which the work needs to be fulfilled. */ diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index f516faa6..7637ee69 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -23,23 +23,38 @@ package org.opendc.simulator.resources /** - * A SimResourceConsumer characterizes how a [SimResource] is consumed. + * A [SimResourceConsumer] characterizes how a [SimResource] is consumed. + * + * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently) + * for multiple resource providers, unless explicitly said otherwise. */ public interface SimResourceConsumer { /** - * This method is invoked when the consumer is started for a resource. + * This method is invoked when the consumer is started for some resource. * * @param ctx The execution context in which the consumer runs. - * @return The next command that the resource should perform. */ - public fun onStart(ctx: SimResourceContext): SimResourceCommand + public fun onStart(ctx: SimResourceContext) {} /** - * This method is invoked when a resource was either interrupted or reached its deadline. + * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because + * the resource finished processing, reached its deadline or was interrupted. * * @param ctx The execution context in which the consumer runs. - * @param remainingWork The remaining work that was not yet completed. - * @return The next command that the resource should perform. + * @param capacity The capacity that is available for the consumer. In case no capacity is available, zero is given. + * @param remainingWork The work of the previous command that was not yet completed due to interruption or deadline. + * @return The next command that the resource should execute. + */ + public fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand + + /** + * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], + * the resource finished itself, or a failure occurred at the resource. + * + * Note that throwing an exception in [onStart] or [onNext] is undefined behavior and up to the resource provider. + * + * @param ctx The execution context in which the consumer ran. + * @param cause The cause of the finish in case the resource finished exceptionally. */ - public fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand + public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {} } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt index ca23557c..23bf8739 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -22,10 +22,6 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume - /** * A helper class to construct a [SimResourceProvider] which forwards the requests to a [SimResourceConsumer]. */ @@ -36,16 +32,6 @@ public class SimResourceForwarder(override val resource: R) : */ private var ctx: SimResourceContext? = null - /** - * A flag to indicate that the forwarder is closed. - */ - private var isClosed: Boolean = false - - /** - * The continuation to resume after consumption. - */ - private var cont: Continuation? = null - /** * The delegate [SimResourceConsumer]. */ @@ -61,95 +47,111 @@ public class SimResourceForwarder(override val resource: R) : */ private var remainingWork: Double = 0.0 - override suspend fun consume(consumer: SimResourceConsumer) { - check(!isClosed) { "Lifecycle of forwarder has ended" } - check(cont == null) { "Run should not be called concurrently" } - - return suspendCancellableCoroutine { cont -> - this.cont = cont - this.delegate = consumer + /** + * The state of the forwarder. + */ + override var state: SimResourceState = SimResourceState.Pending + private set - cont.invokeOnCancellation { reset() } + override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource is in invalid state" } - ctx?.interrupt() - } + state = SimResourceState.Active + delegate = consumer + interrupt() } override fun interrupt() { ctx?.interrupt() } + override fun cancel() { + val delegate = delegate + val ctx = ctx + + state = SimResourceState.Pending + + if (delegate != null && ctx != null) { + this.delegate = null + delegate.onFinish(ctx) + } + } + override fun close() { - isClosed = true - interrupt() - ctx = null + val ctx = ctx + + state = SimResourceState.Stopped + + if (ctx != null) { + this.ctx = null + ctx.interrupt() + } } - override fun onStart(ctx: SimResourceContext): SimResourceCommand { + override fun onStart(ctx: SimResourceContext) { this.ctx = ctx - - return onNext(ctx, 0.0) } - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand { + val delegate = delegate this.remainingWork = remainingWork - return if (isClosed) { - SimResourceCommand.Exit - } else if (!hasDelegateStarted) { + if (!hasDelegateStarted) { start() + } + + return if (state == SimResourceState.Stopped) { + SimResourceCommand.Exit + } else if (delegate != null) { + val command = delegate.onNext(ctx, capacity, remainingWork) + if (command == SimResourceCommand.Exit) { + // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we + // reset beforehand the existing state and check whether it has been updated afterwards + reset() + + delegate.onFinish(ctx) + + if (state == SimResourceState.Stopped) + SimResourceCommand.Exit + else + onNext(ctx, capacity, 0.0) + } else { + command + } } else { - next() + SimResourceCommand.Idle() } } - /** - * Start the delegate. - */ - private fun start(): SimResourceCommand { - val delegate = delegate ?: return SimResourceCommand.Idle() - val command = delegate.onStart(checkNotNull(ctx)) - - hasDelegateStarted = true - - return forward(command) - } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + this.ctx = null - /** - * Obtain the next command to process. - */ - private fun next(): SimResourceCommand { val delegate = delegate - return forward(delegate?.onNext(checkNotNull(ctx), remainingWork) ?: SimResourceCommand.Idle()) + if (delegate != null) { + reset() + delegate.onFinish(ctx, cause) + } } /** - * Forward the specified [command]. + * Start the delegate. */ - private fun forward(command: SimResourceCommand): SimResourceCommand { - return if (command == SimResourceCommand.Exit) { - val cont = checkNotNull(cont) - - // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we - // reset beforehand the existing state and check whether it has been updated afterwards - reset() - cont.resume(Unit) + private fun start() { + val delegate = delegate ?: return + delegate.onStart(checkNotNull(ctx)) - if (isClosed) - SimResourceCommand.Exit - else - start() - } else { - command - } + hasDelegateStarted = true } /** * Reset the delegate. */ private fun reset() { - cont = null delegate = null hasDelegateStarted = false + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index e35aa683..1593281b 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import kotlinx.coroutines.suspendCancellableCoroutine + /** * A [SimResourceProvider] provides some resource of type [R]. */ @@ -32,15 +34,27 @@ public interface SimResourceProvider : AutoCloseable { public val resource: R /** - * Consume the resource provided by this provider using the specified [consumer]. + * The state of the resource. + */ + public val state: SimResourceState + + /** + * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously. + * + * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended. */ - public suspend fun consume(consumer: SimResourceConsumer) + public fun startConsumer(consumer: SimResourceConsumer) /** - * Interrupt the resource. + * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op. */ public fun interrupt() + /** + * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op. + */ + public fun cancel() + /** * End the lifetime of the resource. * @@ -48,3 +62,23 @@ public interface SimResourceProvider : AutoCloseable { */ public override fun close() } + +/** + * Consume the resource provided by this provider using the specified [consumer] and suspend execution until + * the consumer has finished. + */ +public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) { + return suspendCancellableCoroutine { cont -> + startConsumer(object : SimResourceConsumer by consumer { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + assert(!cont.isCompleted) { "Coroutine already completed" } + + cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit)) + + consumer.onFinish(ctx, cause) + } + + override fun toString(): String = "SimSuspendingResourceConsumer" + }) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 540a17c9..a3b16177 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -27,9 +27,6 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import org.opendc.utils.TimerScheduler import java.time.Clock -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException import kotlin.math.min /** @@ -37,6 +34,7 @@ import kotlin.math.min * * @param resource The resource to provide. * @param clock The virtual clock to track simulation time. + * @param scheduler The scheduler to schedule the interrupts. */ public class SimResourceSource( override val resource: R, @@ -50,61 +48,49 @@ public class SimResourceSource( get() = _speed private val _speed = MutableStateFlow(0.0) - /** - * A flag to indicate that the resource was closed. - */ - private var isClosed: Boolean = false - - /** - * The current active consumer. - */ - private var cont: CancellableContinuation? = null - /** * The [Context] that is currently running. */ private var ctx: Context? = null - override suspend fun consume(consumer: SimResourceConsumer) { - check(!isClosed) { "Lifetime of resource has ended." } - check(cont == null) { "Run should not be called concurrently" } + override var state: SimResourceState = SimResourceState.Pending + private set - try { - return suspendCancellableCoroutine { cont -> - val ctx = Context(consumer, cont) + override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource is in invalid state" } + val ctx = Context(consumer) - this.cont = cont - this.ctx = ctx + this.ctx = ctx + this.state = SimResourceState.Active - ctx.start() - cont.invokeOnCancellation { - ctx.stop() - } - } - } finally { - cont = null - ctx = null - } + ctx.start() } override fun close() { - isClosed = true - cont?.cancel() - cont = null - ctx = null + cancel() + state = SimResourceState.Stopped } override fun interrupt() { ctx?.interrupt() } + override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.stop() + } + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } + /** * Internal implementation of [SimResourceContext] for this class. */ - private inner class Context( - consumer: SimResourceConsumer, - val cont: Continuation - ) : SimAbstractResourceContext(resource, clock, consumer) { + private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(resource, clock, consumer) { /** * The processing speed of the resource. */ @@ -130,16 +116,12 @@ public class SimResourceSource( scheduler.startSingleTimerTo(this, until, ::flush) } - override fun onFinish() { + override fun onFinish(cause: Throwable?) { speed = 0.0 scheduler.cancel(this) - cont.resume(Unit) - } + cancel() - override fun onFailure(cause: Throwable) { - speed = 0.0 - scheduler.cancel(this) - cont.resumeWithException(cause) + super.onFinish(cause) } override fun toString(): String = "SimResourceSource.Context[resource=$resource]" diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt new file mode 100644 index 00000000..c72951d0 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * The state of a resource provider. + */ +public enum class SimResourceState { + /** + * The resource provider is pending and the resource is waiting to be consumed. + */ + Pending, + + /** + * The resource provider is active and the resource is currently being consumed. + */ + Active, + + /** + * The resource provider is stopped and the resource cannot be consumed anymore. + */ + Stopped +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 060d0ea2..5eea78f6 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -22,33 +22,30 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel -import kotlinx.coroutines.launch import java.util.ArrayDeque -import kotlin.coroutines.CoroutineContext /** * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that * a single output is directly connected to an input and that the switch can only support as much outputs as inputs. */ -public class SimResourceSwitchExclusive(context: CoroutineContext) : SimResourceSwitch { +public class SimResourceSwitchExclusive : SimResourceSwitch { /** - * The [CoroutineScope] of the service bounded by the lifecycle of the service. + * A flag to indicate that the switch is closed. */ - private val scope = CoroutineScope(context + Job()) + private var isClosed: Boolean = false - private val _outputs = mutableSetOf>() + private val _outputs = mutableSetOf() override val outputs: Set> get() = _outputs private val availableResources = ArrayDeque>() + private val _inputs = mutableSetOf>() override val inputs: Set> get() = _inputs override fun addOutput(resource: R): SimResourceProvider { + check(!isClosed) { "Switch has been closed" } check(availableResources.isNotEmpty()) { "No capacity to serve request" } val forwarder = availableResources.poll() val output = Provider(resource, forwarder) @@ -57,33 +54,37 @@ public class SimResourceSwitchExclusive(context: CoroutineConte } override fun addInput(input: SimResourceProvider) { + check(!isClosed) { "Switch has been closed" } + if (input in inputs) { return } val forwarder = SimResourceForwarder(input.resource) - scope.launch { input.consume(forwarder) } - _inputs += input availableResources += forwarder + + input.startConsumer(object : SimResourceConsumer by forwarder { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + // De-register the input after it has finished + _inputs -= input + forwarder.onFinish(ctx, cause) + } + }) } override fun close() { - scope.cancel() + isClosed = true + + // Cancel all upstream subscriptions + _inputs.forEach(SimResourceProvider::cancel) } private inner class Provider( override val resource: R, private val forwarder: SimResourceForwarder - ) : SimResourceProvider { - - override suspend fun consume(consumer: SimResourceConsumer) = forwarder.consume(consumer) - - override fun interrupt() { - forwarder.interrupt() - } - + ) : SimResourceProvider by forwarder { override fun close() { _outputs -= this availableResources += forwarder diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index bcf76d3c..6b919a77 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -25,10 +25,6 @@ package org.opendc.simulator.resources import kotlinx.coroutines.* import org.opendc.simulator.resources.consumer.SimConsumerBarrier import java.time.Clock -import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -39,14 +35,8 @@ import kotlin.math.min */ public class SimResourceSwitchMaxMin( private val clock: Clock, - context: CoroutineContext, private val listener: Listener? = null ) : SimResourceSwitch { - /** - * The [CoroutineScope] of the service bounded by the lifecycle of the service. - */ - private val scope = CoroutineScope(context + Job()) - private val inputConsumers = mutableSetOf() private val _outputs = mutableSetOf() override val outputs: Set> @@ -111,10 +101,17 @@ public class SimResourceSwitchMaxMin( */ private var barrier: SimConsumerBarrier = SimConsumerBarrier(0) + /** + * A flag to indicate that the switch is closed. + */ + private var isClosed: Boolean = false + /** * Add an output to the switch represented by [resource]. */ override fun addOutput(resource: R): SimResourceProvider { + check(!isClosed) { "Switch has been closed" } + val provider = OutputProvider(resource) _outputs.add(provider) return provider @@ -124,13 +121,15 @@ public class SimResourceSwitchMaxMin( * Add the specified [input] to the switch. */ override fun addInput(input: SimResourceProvider) { + check(!isClosed) { "Switch has been closed" } + val consumer = InputConsumer(input) _inputs.add(input) inputConsumers += consumer } override fun close() { - scope.cancel() + isClosed = true } /** @@ -296,65 +295,57 @@ public class SimResourceSwitchMaxMin( * An internal [SimResourceProvider] implementation for switch outputs. */ private inner class OutputProvider(override val resource: R) : SimResourceProvider { - /** - * A flag to indicate that the resource was closed. - */ - private var isClosed: Boolean = false - - /** - * The current active consumer. - */ - private var cont: CancellableContinuation? = null - /** * The [OutputContext] that is currently running. */ private var ctx: OutputContext? = null - override suspend fun consume(consumer: SimResourceConsumer) { - check(!isClosed) { "Lifetime of resource has ended." } - check(cont == null) { "Run should not be called concurrently" } + override var state: SimResourceState = SimResourceState.Pending + internal set - try { - return suspendCancellableCoroutine { cont -> - val ctx = OutputContext(resource, consumer, cont) - ctx.start() - cont.invokeOnCancellation { - ctx.stop() - } + override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource cannot be consumed" } - this.cont = cont - this.ctx = ctx + val ctx = OutputContext(this, resource, consumer) + this.ctx = ctx + this.state = SimResourceState.Active + outputContexts += ctx - outputContexts += ctx - schedule() - } - } finally { - cont = null - ctx = null - } + ctx.start() + schedule() } override fun close() { - isClosed = true - cont?.cancel() - cont = null - ctx = null + cancel() + + state = SimResourceState.Stopped _outputs.remove(this) } override fun interrupt() { ctx?.interrupt() } + + override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.stop() + } + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } } /** * A [SimAbstractResourceContext] for the output resources. */ private inner class OutputContext( + private val provider: OutputProvider, resource: R, - consumer: SimResourceConsumer, - private val cont: Continuation + consumer: SimResourceConsumer ) : SimAbstractResourceContext(resource, clock, consumer), Comparable { /** * The current command that is processed by the vCPU. @@ -371,11 +362,6 @@ public class SimResourceSwitchMaxMin( */ var actualSpeed: Double = 0.0 - /** - * A flag to indicate that the CPU has exited. - */ - var hasExited: Boolean = false - override fun onIdle(deadline: Long) { allowedSpeed = 0.0 activeCommand = SimResourceCommand.Idle(deadline) @@ -386,16 +372,11 @@ public class SimResourceSwitchMaxMin( activeCommand = SimResourceCommand.Consume(work, limit, deadline) } - override fun onFinish() { - hasExited = true + override fun onFinish(cause: Throwable?) { activeCommand = SimResourceCommand.Exit - cont.resume(Unit) - } + provider.cancel() - override fun onFailure(cause: Throwable) { - hasExited = true - activeCommand = SimResourceCommand.Exit - cont.resumeWithException(cause) + super.onFinish(cause) } override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { @@ -453,21 +434,8 @@ public class SimResourceSwitchMaxMin( private lateinit var ctx: SimResourceContext init { - scope.launch { - try { - barrier = SimConsumerBarrier(barrier.parties + 1) - input.consume(this@InputConsumer) - } catch (e: CancellationException) { - // Cancel gracefully - throw e - } catch (e: Throwable) { - e.printStackTrace() - } finally { - barrier = SimConsumerBarrier(barrier.parties - 1) - inputConsumers -= this@InputConsumer - _inputs -= input - } - } + barrier = SimConsumerBarrier(barrier.parties + 1) + input.startConsumer(this@InputConsumer) } /** @@ -477,12 +445,11 @@ public class SimResourceSwitchMaxMin( ctx.interrupt() } - override fun onStart(ctx: SimResourceContext): SimResourceCommand { + override fun onStart(ctx: SimResourceContext) { this.ctx = ctx - return commands[ctx.resource] ?: SimResourceCommand.Idle() } - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand { totalRemainingWork += remainingWork val isLast = barrier.enter() @@ -504,5 +471,13 @@ public class SimResourceSwitchMaxMin( commands[ctx.resource] ?: SimResourceCommand.Idle() } } + + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + barrier = SimConsumerBarrier(barrier.parties - 1) + inputConsumers -= this@InputConsumer + _inputs -= input + + super.onFinish(ctx, cause) + } } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt index 03a3cebd..a00ee575 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -31,14 +31,16 @@ import org.opendc.simulator.resources.SimResourceContext * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource * consumption for some period of time. */ -public class SimTraceConsumer(trace: Sequence) : SimResourceConsumer { - private val iterator = trace.iterator() +public class SimTraceConsumer(private val trace: Sequence) : SimResourceConsumer { + private var iterator: Iterator? = null - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return onNext(ctx, 0.0) + override fun onStart(ctx: SimResourceContext) { + check(iterator == null) { "Consumer already running" } + iterator = trace.iterator() } - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand { + val iterator = checkNotNull(iterator) return if (iterator.hasNext()) { val now = ctx.clock.millis() val fragment = iterator.next() @@ -56,6 +58,10 @@ public class SimTraceConsumer(trace: Sequence) : SimResourceConsumer, cause: Throwable?) { + iterator = null + } + /** * A fragment of the workload. */ diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt new file mode 100644 index 00000000..bad2f403 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.consumer + +import org.opendc.simulator.resources.SimResource +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext + +/** + * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization. + */ +public class SimWorkConsumer( + private val work: Double, + private val utilization: Double +) : SimResourceConsumer { + + init { + require(work >= 0.0) { "Work must be positive" } + require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } + } + + private var limit = 0.0 + private var remainingWork: Double = 0.0 + + override fun onStart(ctx: SimResourceContext) { + limit = ctx.resource.capacity * utilization + remainingWork = work + } + + override fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand { + val work = this.remainingWork + remainingWork + this.remainingWork -= work + return if (work > 0.0) { + SimResourceCommand.Consume(work, limit) + } else { + SimResourceCommand.Exit + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index e7642dc1..0bc87473 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -22,11 +22,10 @@ package org.opendc.simulator.resources +import io.mockk.* import kotlinx.coroutines.* import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.* -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue import org.opendc.simulator.utils.DelayControllerClockAdapter /** @@ -45,28 +44,15 @@ class SimResourceContextTest { val resource = SimCpu(4200.0) - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Consume(10.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit val context = object : SimAbstractResourceContext(resource, clock, consumer) { - override fun onIdle(deadline: Long) { - } + override fun onIdle(deadline: Long) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) { - } + override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish() { - } - - override fun onFailure(cause: Throwable) { - } + override fun onFinish(cause: Throwable?) {} } context.flush() @@ -77,36 +63,20 @@ class SimResourceContextTest { val clock = DelayControllerClockAdapter(this) val resource = SimCpu(4200.0) - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Consume(10.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } - - var counter = 0 - val context = object : SimAbstractResourceContext(resource, clock, consumer) { - override fun onIdle(deadline: Long) { - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - counter++ - } + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - override fun onFinish() { - } - - override fun onFailure(cause: Throwable) { - } - } + val context = spyk(object : SimAbstractResourceContext(resource, clock, consumer) { + override fun onIdle(deadline: Long) {} + override fun onFinish(cause: Throwable?) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + }) context.start() delay(1) // Delay 1 ms to prevent hitting the fast path context.flush(isIntermediate = true) - assertEquals(2, counter) + + verify(exactly = 2) { context.onConsume(any(), any(), any()) } } @Test @@ -114,33 +84,14 @@ class SimResourceContextTest { val clock = DelayControllerClockAdapter(this) val resource = SimCpu(4200.0) - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Idle(10) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } - - var counter = 0 - var isFinished = false - val context = object : SimAbstractResourceContext(resource, clock, consumer) { - override fun onIdle(deadline: Long) { - counter++ - } + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - override fun onConsume(work: Double, limit: Double, deadline: Long) { - } - - override fun onFinish() { - isFinished = true - } - - override fun onFailure(cause: Throwable) { - } - } + val context = spyk(object : SimAbstractResourceContext(resource, clock, consumer) { + override fun onIdle(deadline: Long) {} + override fun onFinish(cause: Throwable?) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + }) context.start() delay(5) @@ -149,8 +100,26 @@ class SimResourceContextTest { context.flush(isIntermediate = true) assertAll( - { assertEquals(1, counter) }, - { assertTrue(isFinished) } + { verify(exactly = 1) { context.onIdle(any()) } }, + { verify(exactly = 1) { context.onFinish(null) } } ) } + + @Test + fun testDoubleStart() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val resource = SimCpu(4200.0) + + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit + + val context = object : SimAbstractResourceContext(resource, clock, consumer) { + override fun onIdle(deadline: Long) {} + override fun onFinish(cause: Throwable?) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + } + + context.start() + assertThrows { context.start() } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt index ced1bd98..b1b959ba 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt @@ -22,10 +22,16 @@ package org.opendc.simulator.resources +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch import kotlinx.coroutines.test.runBlockingTest +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -53,14 +59,15 @@ internal class SimResourceForwarderTest { } forwarder.consume(object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Exit - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + override fun onNext( + ctx: SimResourceContext, + capacity: Double, + remainingWork: Double + ): SimResourceCommand { return SimResourceCommand.Exit } }) + forwarder.close() scheduler.close() } @@ -78,15 +85,100 @@ internal class SimResourceForwarderTest { } forwarder.consume(object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } + var isFirst = true - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit + override fun onNext( + ctx: SimResourceContext, + capacity: Double, + remainingWork: Double + ): SimResourceCommand { + return if (isFirst) { + isFirst = false + SimResourceCommand.Consume(10.0, 1.0) + } else { + SimResourceCommand.Exit + } } }) forwarder.close() } + + @Test + fun testState() = runBlockingTest { + val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val consumer = object : SimResourceConsumer { + override fun onNext( + ctx: SimResourceContext, + capacity: Double, + remainingWork: Double + ): SimResourceCommand = SimResourceCommand.Exit + } + + assertEquals(SimResourceState.Pending, forwarder.state) + + forwarder.startConsumer(consumer) + assertEquals(SimResourceState.Active, forwarder.state) + + assertThrows { forwarder.startConsumer(consumer) } + + forwarder.cancel() + assertEquals(SimResourceState.Pending, forwarder.state) + + forwarder.close() + assertEquals(SimResourceState.Stopped, forwarder.state) + } + + @Test + fun testCancelPendingDelegate() = runBlockingTest { + val forwarder = SimResourceForwarder(SimCpu(1000.0)) + + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit + + forwarder.startConsumer(consumer) + forwarder.cancel() + + verify(exactly = 0) { consumer.onFinish(any(), null) } + } + + @Test + fun testCancelStartedDelegate() = runBlockingTest { + val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler(coroutineContext, clock) + val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) + + source.startConsumer(forwarder) + yield() + forwarder.startConsumer(consumer) + yield() + forwarder.cancel() + + verify(exactly = 1) { consumer.onStart(any()) } + verify(exactly = 1) { consumer.onFinish(any(), null) } + } + + @Test + fun testCancelPropagation() = runBlockingTest { + val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler(coroutineContext, clock) + val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) + + source.startConsumer(forwarder) + yield() + forwarder.startConsumer(consumer) + yield() + source.cancel() + + verify(exactly = 1) { consumer.onStart(any()) } + verify(exactly = 1) { consumer.onFinish(any(), null) } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 4f7825fc..18f18ded 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import io.mockk.every +import io.mockk.mockk import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runBlockingTest @@ -46,15 +48,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Consume(1000 * ctx.resource.speed, ctx.resource.speed) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1000 * provider.resource.speed, provider.resource.speed)) + .andThen(SimResourceCommand.Exit) try { val res = mutableListOf() @@ -76,15 +73,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Consume(1000 * ctx.resource.speed, 2 * ctx.resource.speed) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1000 * provider.resource.speed, 2 * provider.resource.speed)) + .andThen(SimResourceCommand.Exit) try { val res = mutableListOf() @@ -111,13 +103,12 @@ class SimResourceSourceTest { val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { + override fun onStart(ctx: SimResourceContext) { ctx.interrupt() - return SimResourceCommand.Exit } - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() + override fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit } } @@ -137,14 +128,19 @@ class SimResourceSourceTest { lateinit var resCtx: SimResourceContext val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { + var isFirst = true + override fun onStart(ctx: SimResourceContext) { resCtx = ctx - return SimResourceCommand.Consume(4.0, 1.0) } - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand { assertEquals(0.0, remainingWork) - return SimResourceCommand.Exit + return if (isFirst) { + isFirst = false + SimResourceCommand.Consume(4.0, 1.0) + } else { + SimResourceCommand.Exit + } } } @@ -168,15 +164,9 @@ class SimResourceSourceTest { val scheduler = TimerScheduler(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - throw IllegalStateException() - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onStart(any()) } + .throws(IllegalStateException()) try { assertThrows { @@ -194,15 +184,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) try { assertThrows { @@ -220,15 +205,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) try { assertThrows { @@ -249,15 +229,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) try { assertThrows { @@ -276,15 +251,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) try { launch { provider.consume(consumer) } @@ -304,15 +274,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Idle(ctx.clock.millis() + 500) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Idle(clock.millis() + 500)) + .andThen(SimResourceCommand.Exit) try { provider.consume(consumer) @@ -332,15 +297,10 @@ class SimResourceSourceTest { val scheduler = TimerScheduler(coroutineContext, clock) val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Idle() - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Idle()) + .andThenThrows(IllegalStateException()) try { provider.consume(consumer) @@ -351,4 +311,25 @@ class SimResourceSourceTest { } } } + + @Test + fun testIncorrectDeadline() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } + .returns(SimResourceCommand.Idle(2)) + .andThen(SimResourceCommand.Exit) + + try { + delay(10) + + assertThrows { provider.consume(consumer) } + } finally { + scheduler.close() + provider.close() + } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index ca6558bf..354dab93 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import io.mockk.every +import io.mockk.mockk import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch @@ -34,7 +36,6 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.resources.consumer.SimTraceConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler -import java.lang.IllegalStateException /** * Test suite for the [SimResourceSwitchExclusive] class. @@ -67,7 +68,7 @@ internal class SimResourceSwitchExclusiveTest { ), ) - val switch = SimResourceSwitchExclusive(coroutineContext) + val switch = SimResourceSwitchExclusive() val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) switch.addInput(source) @@ -98,17 +99,10 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = TimerScheduler(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Consume(duration / 1000.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val workload = mockk>(relaxUnitFun = true) + every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit - val switch = SimResourceSwitchExclusive(coroutineContext) + val switch = SimResourceSwitchExclusive() val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) switch.addInput(source) @@ -134,16 +128,27 @@ internal class SimResourceSwitchExclusiveTest { val duration = 5 * 60L * 1000 val workload = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Consume(duration / 1000.0, 1.0) + var isFirst = true + + override fun onStart(ctx: SimResourceContext) { + isFirst = true } - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit + override fun onNext( + ctx: SimResourceContext, + capacity: Double, + remainingWork: Double + ): SimResourceCommand { + return if (isFirst) { + isFirst = false + SimResourceCommand.Consume(duration / 1000.0, 1.0) + } else { + SimResourceCommand.Exit + } } } - val switch = SimResourceSwitchExclusive(coroutineContext) + val switch = SimResourceSwitchExclusive() val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) switch.addInput(source) @@ -169,17 +174,10 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = TimerScheduler(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Consume(duration.toDouble(), 1.0) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val workload = mockk>(relaxUnitFun = true) + every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit - val switch = SimResourceSwitchExclusive(coroutineContext) + val switch = SimResourceSwitchExclusive() val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) switch.addInput(source) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index 698c1700..e8f5a13c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import io.mockk.every +import io.mockk.mockk import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -47,22 +49,15 @@ internal class SimResourceSwitchMaxMinTest { fun testSmoke() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val switch = SimResourceSwitchMaxMin(clock, coroutineContext) + val switch = SimResourceSwitchMaxMin(clock) val sources = List(2) { SimResourceSource(SimCpu(2000.0), clock, scheduler) } sources.forEach { switch.addInput(it) } val provider = switch.addOutput(SimCpu(1000.0)) - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk>(relaxUnitFun = true) + every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit try { provider.consume(consumer) @@ -112,7 +107,7 @@ internal class SimResourceSwitchMaxMinTest { ), ) - val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener) + val switch = SimResourceSwitchMaxMin(clock, listener) val provider = switch.addOutput(SimCpu(3200.0)) try { @@ -180,7 +175,7 @@ internal class SimResourceSwitchMaxMinTest { ) ) - val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener) + val switch = SimResourceSwitchMaxMin(clock, listener) val providerA = switch.addOutput(SimCpu(3200.0)) val providerB = switch.addOutput(SimCpu(3200.0)) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt new file mode 100644 index 00000000..b05195f7 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler + +/** + * A test suite for the [SimWorkConsumer] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimWorkConsumerTest { + data class SimCpu(val speed: Double) : SimResource { + override val capacity: Double + get() = speed + } + + @Test + fun testSmoke() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(1.0), clock, scheduler) + + val consumer = SimWorkConsumer(1.0, 1.0) + + try { + provider.consume(consumer) + assertEquals(1000, currentTime) + } finally { + provider.close() + } + } + + @Test + fun testUtilization() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(1.0), clock, scheduler) + + val consumer = SimWorkConsumer(1.0, 0.5) + + try { + provider.consume(consumer) + assertEquals(2000, currentTime) + } finally { + provider.close() + } + } +} diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt index 49964938..d4bc7b5c 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt @@ -93,7 +93,7 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo try { timer() } catch (e: Throwable) { - Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e) + coroutineContext[CoroutineExceptionHandler]?.handleException(coroutineContext, e) } } } -- cgit v1.2.3 From f1aa2632804916fb364f4fa207ac8ab97479f711 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 19 Mar 2021 13:24:36 +0100 Subject: simulator: Add benchmarks for resource consumption framework This change adds an initial set of benchmarks for the resource consumption framework in order to measure the effect of changes on the performance of the simulator. --- simulator/buildSrc/build.gradle.kts | 2 + .../main/kotlin/benchmark-conventions.gradle.kts | 60 +++++++++++++++++ .../opendc-simulator-resources/build.gradle.kts | 2 + .../resources/SimResourceSourceBenchmark.kt | 75 +++++++++++++++++++++ .../resources/SimResourceSwitchBenchmark.kt | 76 ++++++++++++++++++++++ 5 files changed, 215 insertions(+) create mode 100644 simulator/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSourceBenchmark.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSwitchBenchmark.kt diff --git a/simulator/buildSrc/build.gradle.kts b/simulator/buildSrc/build.gradle.kts index be071d0c..a71e18cf 100644 --- a/simulator/buildSrc/build.gradle.kts +++ b/simulator/buildSrc/build.gradle.kts @@ -36,6 +36,8 @@ dependencies { implementation(kotlin("gradle-plugin", version = "1.4.31")) implementation("org.jlleitschuh.gradle:ktlint-gradle:10.0.0") implementation("org.jetbrains.dokka:dokka-gradle-plugin:0.10.1") + implementation("org.jetbrains.kotlin:kotlin-allopen:1.4.30") + implementation("org.jetbrains.kotlinx:kotlinx-benchmark-plugin:0.3.0") } kotlinDslPluginOptions { diff --git a/simulator/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts b/simulator/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts new file mode 100644 index 00000000..d3bb886d --- /dev/null +++ b/simulator/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +import kotlinx.benchmark.gradle.* +import org.jetbrains.kotlin.allopen.gradle.* + +plugins { + id("org.jetbrains.kotlinx.benchmark") + `java-library` + kotlin("plugin.allopen") +} + +sourceSets { + register("jmh") { + compileClasspath += sourceSets["main"].output + runtimeClasspath += sourceSets["main"].output + } +} + +configurations { + named("jmhImplementation") { + extendsFrom(configurations["implementation"]) + } +} + +configure { + annotation("org.openjdk.jmh.annotations.State") +} + +benchmark { + targets { + register("jmh") { + this as JvmBenchmarkTarget + jmhVersion = "1.21" + } + } +} + +dependencies { + implementation("org.jetbrains.kotlinx:kotlinx-benchmark-runtime-jvm:0.3.0") +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts index 831ca3db..3b0a197c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts +++ b/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts @@ -26,6 +26,7 @@ plugins { `kotlin-library-conventions` `testing-conventions` `jacoco-conventions` + `benchmark-conventions` } dependencies { @@ -33,5 +34,6 @@ dependencies { api("org.jetbrains.kotlinx:kotlinx-coroutines-core") implementation(project(":opendc-utils")) + jmhImplementation(project(":opendc-simulator:opendc-simulator-core")) testImplementation(project(":opendc-simulator:opendc-simulator-core")) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSourceBenchmark.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSourceBenchmark.kt new file mode 100644 index 00000000..09246fe4 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSourceBenchmark.kt @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler +import org.openjdk.jmh.annotations.* +import java.time.Clock +import java.util.concurrent.TimeUnit + +@State(Scope.Benchmark) +@Fork(1) +@Warmup(iterations = 0) +@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +@OptIn(ExperimentalCoroutinesApi::class) +class SimResourceSourceBenchmark { + private lateinit var scope: TestCoroutineScope + private lateinit var clock: Clock + private lateinit var scheduler: TimerScheduler + private lateinit var consumer: SimResourceConsumer + + @Setup + fun setUp() { + scope = TestCoroutineScope() + clock = DelayControllerClockAdapter(scope) + scheduler = TimerScheduler(scope.coroutineContext, clock) + consumer = + SimTraceConsumer( + sequenceOf( + SimTraceConsumer.Fragment(1000, 28.0), + SimTraceConsumer.Fragment(1000, 3500.0), + SimTraceConsumer.Fragment(1000, 0.0), + SimTraceConsumer.Fragment(1000, 183.0), + SimTraceConsumer.Fragment(1000, 400.0), + SimTraceConsumer.Fragment(1000, 100.0), + SimTraceConsumer.Fragment(1000, 3000.0), + SimTraceConsumer.Fragment(1000, 4500.0), + ), + ) + } + + @Benchmark + fun benchmarkSource() { + return scope.runBlockingTest { + val provider = SimResourceSource(SimGenericResource(4200.0), clock, scheduler) + return@runBlockingTest provider.consume(consumer) + } + } + + data class SimGenericResource(override val capacity: Double) : SimResource +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSwitchBenchmark.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSwitchBenchmark.kt new file mode 100644 index 00000000..be31e86d --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSwitchBenchmark.kt @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler +import org.openjdk.jmh.annotations.* +import java.time.Clock +import java.util.concurrent.TimeUnit + +@State(Scope.Benchmark) +@Fork(1) +@Warmup(iterations = 0) +@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) +@OptIn(ExperimentalCoroutinesApi::class) +class SimResourceSwitchBenchmark { + private lateinit var scope: TestCoroutineScope + private lateinit var clock: Clock + private lateinit var scheduler: TimerScheduler + private lateinit var consumer: SimResourceConsumer + + @Setup + fun setUp() { + scope = TestCoroutineScope() + clock = DelayControllerClockAdapter(scope) + scheduler = TimerScheduler(scope.coroutineContext, clock) + consumer = + SimTraceConsumer( + sequenceOf( + SimTraceConsumer.Fragment(1000, 28.0), + SimTraceConsumer.Fragment(1000, 3500.0), + SimTraceConsumer.Fragment(1000, 0.0), + SimTraceConsumer.Fragment(1000, 183.0) + ), + ) + } + + @Benchmark + fun benchmarkSwitch() { + return scope.runBlockingTest { + val switch = SimResourceSwitchMaxMin(clock) + + switch.addInput(SimResourceSource(SimGenericResource(3000.0), clock, scheduler)) + switch.addInput(SimResourceSource(SimGenericResource(3000.0), clock, scheduler)) + + val provider = switch.addOutput(SimGenericResource(3500.0)) + return@runBlockingTest provider.consume(consumer) + } + } + + data class SimGenericResource(override val capacity: Double) : SimResource +} -- cgit v1.2.3 From f616b720406250b1415593ff04c9d910b1fda54c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 22 Mar 2021 12:13:23 +0100 Subject: simulator: Expose capacity and remaining work outside consumer callback This change changes the consumer and context interfaces to expose the provider capacity and remaining work via the context instance as opposed to only via the callback. This simplifies aggregation of resources. --- .../org/opendc/compute/simulator/SimHostTest.kt | 4 +- .../experiments/capelin/CapelinIntegrationTest.kt | 8 +- .../simulator/compute/workload/SimTraceWorkload.kt | 2 +- .../resources/SimAbstractResourceContext.kt | 85 ++++++++++++++-------- .../simulator/resources/SimResourceConsumer.kt | 4 +- .../simulator/resources/SimResourceContext.kt | 10 +++ .../simulator/resources/SimResourceForwarder.kt | 12 +-- .../simulator/resources/SimResourceSource.kt | 1 - .../simulator/resources/SimResourceSwitchMaxMin.kt | 37 ++++++---- .../resources/consumer/SimTraceConsumer.kt | 2 +- .../resources/consumer/SimWorkConsumer.kt | 4 +- .../simulator/resources/SimResourceContextTest.kt | 8 +- .../resources/SimResourceForwarderTest.kt | 18 ++--- .../simulator/resources/SimResourceSourceTest.kt | 24 +++--- .../resources/SimResourceSwitchExclusiveTest.kt | 8 +- .../resources/SimResourceSwitchMaxMinTest.kt | 2 +- 16 files changed, 127 insertions(+), 102 deletions(-) diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index c4bd0cb4..d7a3b744 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -137,8 +137,8 @@ internal class SimHostTest { assertAll( { assertEquals(emptyList(), scope.uncaughtExceptions, "No errors") }, { assertEquals(4281600, requestedWork, "Requested work does not match") }, - { assertEquals(3141600, grantedWork, "Granted work does not match") }, - { assertEquals(1140000, overcommittedWork, "Overcommitted work does not match") }, + { assertEquals(2241600, grantedWork, "Granted work does not match") }, + { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, { assertEquals(1200006, scope.currentTime) } ) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 7dae53be..612509de 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -144,8 +144,8 @@ class CapelinIntegrationTest { { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, { assertEquals(1707144601723, monitor.totalRequestedBurst) }, - { assertEquals(457893364971, monitor.totalGrantedBurst) }, - { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) }, + { assertEquals(457798297997, monitor.totalGrantedBurst) }, + { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) } @@ -190,8 +190,8 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( { assertEquals(711464339925, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(175226293948, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(175226294127, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(528959213229, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index cc4f3136..e8050263 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -50,7 +50,7 @@ public class SimTraceWorkload(public val trace: Sequence) : SimWorkloa } private inner class CpuConsumer : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { val now = ctx.clock.millis() val fragment = fragment ?: return SimResourceCommand.Exit val work = (fragment.duration / 1000) * fragment.usage diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index 431ca625..dba334a2 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -35,6 +35,21 @@ public abstract class SimAbstractResourceContext( override val clock: Clock, private val consumer: SimResourceConsumer ) : SimResourceContext { + /** + * The capacity of the resource. + */ + override val capacity: Double + get() = resource.capacity + + /** + * The amount of work still remaining at this instant. + */ + override val remainingWork: Double + get() { + val activeCommand = activeCommand ?: return 0.0 + return computeRemainingWork(activeCommand, clock.millis()) + } + /** * This method is invoked when the resource will idle until the specified [deadline]. */ @@ -64,27 +79,18 @@ public abstract class SimAbstractResourceContext( * Compute the speed at which the resource may be consumed. */ protected open fun getSpeed(limit: Double): Double { - return min(limit, getCapacity()) - } - - /** - * Return the capacity available for the resource consumer. - */ - protected open fun getCapacity(): Double { - return resource.capacity + return min(limit, capacity) } /** - * Get the remaining work to process after a resource consumption was flushed. + * Get the remaining work to process after a resource consumption. * * @param work The size of the resource consumption. * @param speed The speed of consumption. * @param duration The duration from the start of the consumption until now. - * @param isInterrupted A flag to indicate that the resource consumption could not be fully processed due to - * it being interrupted before it could finish or reach its deadline. * @return The amount of work remaining. */ - protected open fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { + protected open fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { return if (duration > 0L) { val processed = duration / 1000.0 * speed max(0.0, work - processed) @@ -99,13 +105,15 @@ public abstract class SimAbstractResourceContext( public fun start() { check(state == SimResourceState.Pending) { "Consumer is already started" } + val now = clock.millis() + state = SimResourceState.Active isProcessing = true - latestFlush = clock.millis() + latestFlush = now try { consumer.onStart(this) - interpret(consumer.onNext(this, getCapacity(), 0.0)) + activeCommand = interpret(consumer.onNext(this), now) } catch (cause: Throwable) { doStop(cause) } finally { @@ -154,36 +162,34 @@ public abstract class SimAbstractResourceContext( val activeCommand = activeCommand ?: return val (timestamp, command) = activeCommand + // Note: accessor is reliant on activeCommand being set + val remainingWork = remainingWork + isProcessing = true - this.activeCommand = null val duration = now - timestamp assert(duration >= 0) { "Flush in the past" } - when (command) { + this.activeCommand = when (command) { is SimResourceCommand.Idle -> { // We should only continue processing the next command if: // 1. The resource consumer reached its deadline. // 2. The resource consumer should be interrupted (e.g., someone called .interrupt()) if (command.deadline <= now || !isIntermediate) { - next(remainingWork = 0.0) + next(now) } else { - this.activeCommand = activeCommand + activeCommand } } is SimResourceCommand.Consume -> { - val speed = min(resource.capacity, command.limit) - val isInterrupted = !isIntermediate && duration < getDuration(command.work, speed) - val remainingWork = getRemainingWork(command.work, speed, duration, isInterrupted) - // We should only continue processing the next command if: // 1. The resource consumption was finished. // 2. The resource consumer reached its deadline. // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { - next(remainingWork) + next(now) } else { - interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline)) + interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline), now) } } SimResourceCommand.Exit -> @@ -238,6 +244,7 @@ public abstract class SimAbstractResourceContext( this.state = SimResourceState.Stopped if (state == SimResourceState.Active) { + activeCommand = null onFinish(cause) } } @@ -245,9 +252,7 @@ public abstract class SimAbstractResourceContext( /** * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. */ - private fun interpret(command: SimResourceCommand) { - val now = clock.millis() - + private fun interpret(command: SimResourceCommand, now: Long): CommandWrapper? { when (command) { is SimResourceCommand.Idle -> { val deadline = command.deadline @@ -265,18 +270,34 @@ public abstract class SimAbstractResourceContext( onConsume(work, limit, deadline) } - is SimResourceCommand.Exit -> doStop(null) + is SimResourceCommand.Exit -> { + doStop(null) + // No need to set the next active command + return null + } } - assert(activeCommand == null) { "Concurrent access to current command" } - activeCommand = CommandWrapper(now, command) + return CommandWrapper(now, command) } /** * Request the workload for more work. */ - private fun next(remainingWork: Double) { - interpret(consumer.onNext(this, getCapacity(), remainingWork)) + private fun next(now: Long): CommandWrapper? = interpret(consumer.onNext(this), now) + + /** + * Compute the remaining work based on the specified [wrapper] and [timestamp][now]. + */ + private fun computeRemainingWork(wrapper: CommandWrapper, now: Long): Double { + val (timestamp, command) = wrapper + val duration = now - timestamp + return when (command) { + is SimResourceCommand.Consume -> { + val speed = getSpeed(command.limit) + getRemainingWork(command.work, speed, duration) + } + is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0 + } } /** diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 7637ee69..01b56488 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -41,11 +41,9 @@ public interface SimResourceConsumer { * the resource finished processing, reached its deadline or was interrupted. * * @param ctx The execution context in which the consumer runs. - * @param capacity The capacity that is available for the consumer. In case no capacity is available, zero is given. - * @param remainingWork The work of the previous command that was not yet completed due to interruption or deadline. * @return The next command that the resource should execute. */ - public fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand + public fun onNext(ctx: SimResourceContext): SimResourceCommand /** * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index dfb5e9ce..f13764fb 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -39,6 +39,16 @@ public interface SimResourceContext { */ public val clock: Clock + /** + * The resource capacity available at this instant. + */ + public val capacity: Double + + /** + * The amount of work still remaining at this instant. + */ + public val remainingWork: Double + /** * Ask the resource provider to interrupt its resource. */ diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt index 23bf8739..732e709a 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -42,11 +42,6 @@ public class SimResourceForwarder(override val resource: R) : */ private var hasDelegateStarted: Boolean = false - /** - * The remaining amount of work last cycle. - */ - private var remainingWork: Double = 0.0 - /** * The state of the forwarder. */ @@ -92,9 +87,8 @@ public class SimResourceForwarder(override val resource: R) : this.ctx = ctx } - override fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { val delegate = delegate - this.remainingWork = remainingWork if (!hasDelegateStarted) { start() @@ -103,7 +97,7 @@ public class SimResourceForwarder(override val resource: R) : return if (state == SimResourceState.Stopped) { SimResourceCommand.Exit } else if (delegate != null) { - val command = delegate.onNext(ctx, capacity, remainingWork) + val command = delegate.onNext(ctx) if (command == SimResourceCommand.Exit) { // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we // reset beforehand the existing state and check whether it has been updated afterwards @@ -114,7 +108,7 @@ public class SimResourceForwarder(override val resource: R) : if (state == SimResourceState.Stopped) SimResourceCommand.Exit else - onNext(ctx, capacity, 0.0) + onNext(ctx) } else { command } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index a3b16177..99545c4c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import org.opendc.utils.TimerScheduler diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index 6b919a77..ee8edfcd 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -57,9 +57,10 @@ public class SimResourceSwitchMaxMin( private val outputContexts: MutableList = mutableListOf() /** - * The total amount of remaining work (of all pCPUs). + * The remaining work of all inputs. */ - private var totalRemainingWork: Double = 0.0 + private val totalRemainingWork: Double + get() = inputConsumers.sumByDouble { it.remainingWork } /** * The total speed requested by the vCPUs. @@ -241,6 +242,8 @@ public class SimResourceSwitchMaxMin( * Flush the progress of the vCPUs. */ private fun flushGuests() { + val totalRemainingWork = totalRemainingWork + // Flush all the outputs work for (output in outputContexts) { output.flush(isIntermediate = true) @@ -256,7 +259,6 @@ public class SimResourceSwitchMaxMin( totalRequestedSpeed, totalAllocatedSpeed ) - totalRemainingWork = 0.0 totalInterferedWork = 0.0 totalOvercommittedWork = 0.0 @@ -362,29 +364,39 @@ public class SimResourceSwitchMaxMin( */ var actualSpeed: Double = 0.0 + private fun reportOvercommit() { + totalOvercommittedWork += remainingWork + } + override fun onIdle(deadline: Long) { + reportOvercommit() + allowedSpeed = 0.0 activeCommand = SimResourceCommand.Idle(deadline) } override fun onConsume(work: Double, limit: Double, deadline: Long) { + reportOvercommit() + allowedSpeed = getSpeed(limit) activeCommand = SimResourceCommand.Consume(work, limit, deadline) } override fun onFinish(cause: Throwable?) { + reportOvercommit() + activeCommand = SimResourceCommand.Exit provider.cancel() super.onFinish(cause) } - override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { + override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { // Apply performance interference model val performanceScore = 1.0 // Compute the remaining amount of work - val remainingWork = if (work > 0.0) { + return if (work > 0.0) { // Compute the fraction of compute time allocated to the VM val fraction = actualSpeed / totalAllocatedSpeed @@ -400,12 +412,6 @@ public class SimResourceSwitchMaxMin( } else { 0.0 } - - if (!isInterrupted) { - totalOvercommittedWork += remainingWork - } - - return remainingWork } override fun interrupt() { @@ -433,6 +439,12 @@ public class SimResourceSwitchMaxMin( */ private lateinit var ctx: SimResourceContext + /** + * The remaining work of this consumer. + */ + val remainingWork: Double + get() = ctx.remainingWork + init { barrier = SimConsumerBarrier(barrier.parties + 1) input.startConsumer(this@InputConsumer) @@ -449,8 +461,7 @@ public class SimResourceSwitchMaxMin( this.ctx = ctx } - override fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand { - totalRemainingWork += remainingWork + override fun onNext(ctx: SimResourceContext): SimResourceCommand { val isLast = barrier.enter() // Flush the progress of the guest after the barrier has been reached. diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt index a00ee575..0189fe4c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -39,7 +39,7 @@ public class SimTraceConsumer(private val trace: Sequence) : SimResour iterator = trace.iterator() } - override fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { val iterator = checkNotNull(iterator) return if (iterator.hasNext()) { val now = ctx.clock.millis() diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt index bad2f403..62425583 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -48,8 +48,8 @@ public class SimWorkConsumer( remainingWork = work } - override fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand { - val work = this.remainingWork + remainingWork + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + val work = this.remainingWork + ctx.remainingWork this.remainingWork -= work return if (work > 0.0) { SimResourceCommand.Consume(work, limit) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index 0bc87473..5d4eb46d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -45,7 +45,7 @@ class SimResourceContextTest { val resource = SimCpu(4200.0) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit + every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit val context = object : SimAbstractResourceContext(resource, clock, consumer) { override fun onIdle(deadline: Long) {} @@ -64,7 +64,7 @@ class SimResourceContextTest { val resource = SimCpu(4200.0) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit + every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit val context = spyk(object : SimAbstractResourceContext(resource, clock, consumer) { override fun onIdle(deadline: Long) {} @@ -85,7 +85,7 @@ class SimResourceContextTest { val resource = SimCpu(4200.0) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit + every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit val context = spyk(object : SimAbstractResourceContext(resource, clock, consumer) { override fun onIdle(deadline: Long) {} @@ -111,7 +111,7 @@ class SimResourceContextTest { val resource = SimCpu(4200.0) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit + every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit val context = object : SimAbstractResourceContext(resource, clock, consumer) { override fun onIdle(deadline: Long) {} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt index b1b959ba..47794bdd 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt @@ -60,9 +60,7 @@ internal class SimResourceForwarderTest { forwarder.consume(object : SimResourceConsumer { override fun onNext( - ctx: SimResourceContext, - capacity: Double, - remainingWork: Double + ctx: SimResourceContext ): SimResourceCommand { return SimResourceCommand.Exit } @@ -88,9 +86,7 @@ internal class SimResourceForwarderTest { var isFirst = true override fun onNext( - ctx: SimResourceContext, - capacity: Double, - remainingWork: Double + ctx: SimResourceContext ): SimResourceCommand { return if (isFirst) { isFirst = false @@ -109,9 +105,7 @@ internal class SimResourceForwarderTest { val forwarder = SimResourceForwarder(SimCpu(1000.0)) val consumer = object : SimResourceConsumer { override fun onNext( - ctx: SimResourceContext, - capacity: Double, - remainingWork: Double + ctx: SimResourceContext ): SimResourceCommand = SimResourceCommand.Exit } @@ -134,7 +128,7 @@ internal class SimResourceForwarderTest { val forwarder = SimResourceForwarder(SimCpu(1000.0)) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit + every { consumer.onNext(any()) } returns SimResourceCommand.Exit forwarder.startConsumer(consumer) forwarder.cancel() @@ -150,7 +144,7 @@ internal class SimResourceForwarderTest { val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) + every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) source.startConsumer(forwarder) yield() @@ -170,7 +164,7 @@ internal class SimResourceForwarderTest { val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) + every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) source.startConsumer(forwarder) yield() diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 18f18ded..c0ed8c9e 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -49,7 +49,7 @@ class SimResourceSourceTest { val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } + every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1000 * provider.resource.speed, provider.resource.speed)) .andThen(SimResourceCommand.Exit) @@ -74,7 +74,7 @@ class SimResourceSourceTest { val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } + every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1000 * provider.resource.speed, 2 * provider.resource.speed)) .andThen(SimResourceCommand.Exit) @@ -107,7 +107,7 @@ class SimResourceSourceTest { ctx.interrupt() } - override fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { return SimResourceCommand.Exit } } @@ -133,8 +133,8 @@ class SimResourceSourceTest { resCtx = ctx } - override fun onNext(ctx: SimResourceContext, capacity: Double, remainingWork: Double): SimResourceCommand { - assertEquals(0.0, remainingWork) + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + assertEquals(0.0, ctx.remainingWork) return if (isFirst) { isFirst = false SimResourceCommand.Consume(4.0, 1.0) @@ -185,7 +185,7 @@ class SimResourceSourceTest { val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } + every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) @@ -206,7 +206,7 @@ class SimResourceSourceTest { val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } + every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) @@ -230,7 +230,7 @@ class SimResourceSourceTest { val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } + every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) @@ -252,7 +252,7 @@ class SimResourceSourceTest { val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } + every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) @@ -275,7 +275,7 @@ class SimResourceSourceTest { val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } + every { consumer.onNext(any()) } .returns(SimResourceCommand.Idle(clock.millis() + 500)) .andThen(SimResourceCommand.Exit) @@ -298,7 +298,7 @@ class SimResourceSourceTest { val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } + every { consumer.onNext(any()) } .returns(SimResourceCommand.Idle()) .andThenThrows(IllegalStateException()) @@ -319,7 +319,7 @@ class SimResourceSourceTest { val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } + every { consumer.onNext(any()) } .returns(SimResourceCommand.Idle(2)) .andThen(SimResourceCommand.Exit) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index 354dab93..dc90a43e 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -100,7 +100,7 @@ internal class SimResourceSwitchExclusiveTest { val duration = 5 * 60L * 1000 val workload = mockk>(relaxUnitFun = true) - every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit + every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit val switch = SimResourceSwitchExclusive() val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) @@ -135,9 +135,7 @@ internal class SimResourceSwitchExclusiveTest { } override fun onNext( - ctx: SimResourceContext, - capacity: Double, - remainingWork: Double + ctx: SimResourceContext ): SimResourceCommand { return if (isFirst) { isFirst = false @@ -175,7 +173,7 @@ internal class SimResourceSwitchExclusiveTest { val duration = 5 * 60L * 1000 val workload = mockk>(relaxUnitFun = true) - every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit + every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit val switch = SimResourceSwitchExclusive() val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index e8f5a13c..8b989334 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -57,7 +57,7 @@ internal class SimResourceSwitchMaxMinTest { val provider = switch.addOutput(SimCpu(1000.0)) val consumer = mockk>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit + every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit try { provider.consume(consumer) -- cgit v1.2.3 From 3718c385f84b463ac799080bb5603e0011adcd7d Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 22 Mar 2021 16:45:13 +0100 Subject: simulator: Remove generic resource constraint from resource model This change removes the generic resource constraint (e.g., SimResource) and replaces it by a simple capacity property. In the future, users should handle the resource properties on a higher level. This change simplifies compositions of consumers and providers by not requiring a translation from resource to capacity. --- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 4 +- .../org/opendc/compute/simulator/SimHostTest.kt | 16 +- .../experiments/capelin/CapelinIntegrationTest.kt | 12 +- .../environment/sc18/Sc18EnvironmentReader.kt | 16 +- .../sc20/Sc20ClusterEnvironmentReader.kt | 12 +- .../environment/sc20/Sc20EnvironmentReader.kt | 16 +- .../kotlin/org/opendc/runner/web/TopologyParser.kt | 12 +- .../simulator/SimServerlessServiceTest.kt | 12 +- .../simulator/compute/SimAbstractHypervisor.kt | 24 +- .../opendc/simulator/compute/SimAbstractMachine.kt | 17 +- .../simulator/compute/SimBareMetalMachine.kt | 6 +- .../simulator/compute/SimFairShareHypervisor.kt | 9 +- .../opendc/simulator/compute/SimMachineContext.kt | 13 +- .../opendc/simulator/compute/SimMachineModel.kt | 6 +- .../simulator/compute/SimSpaceSharedHypervisor.kt | 5 +- .../opendc/simulator/compute/model/MemoryUnit.kt | 38 ++ .../simulator/compute/model/ProcessingNode.kt | 38 ++ .../simulator/compute/model/ProcessingUnit.kt | 36 ++ .../simulator/compute/model/SimMemoryUnit.kt | 43 -- .../simulator/compute/model/SimProcessingNode.kt | 38 -- .../simulator/compute/model/SimProcessingUnit.kt | 41 -- .../simulator/compute/workload/SimFlopsWorkload.kt | 4 +- .../compute/workload/SimRuntimeWorkload.kt | 4 +- .../simulator/compute/workload/SimTraceWorkload.kt | 40 +- .../simulator/compute/workload/SimWorkload.kt | 4 +- .../opendc/simulator/compute/SimHypervisorTest.kt | 12 +- .../org/opendc/simulator/compute/SimMachineTest.kt | 12 +- .../compute/SimSpaceSharedHypervisorTest.kt | 12 +- .../opendc/simulator/resources/BenchmarkHelpers.kt | 43 ++ .../simulator/resources/SimResourceBenchmarks.kt | 139 +++++++ .../resources/SimResourceSourceBenchmark.kt | 75 ---- .../resources/SimResourceSwitchBenchmark.kt | 76 ---- .../resources/SimAbstractResourceAggregator.kt | 198 +++++++++ .../resources/SimAbstractResourceContext.kt | 28 +- .../org/opendc/simulator/resources/SimResource.kt | 33 -- .../simulator/resources/SimResourceAggregator.kt | 48 +++ .../resources/SimResourceAggregatorMaxMin.kt | 63 +++ .../simulator/resources/SimResourceConsumer.kt | 8 +- .../simulator/resources/SimResourceContext.kt | 7 +- .../simulator/resources/SimResourceDistributor.kt | 43 ++ .../resources/SimResourceDistributorMaxMin.kt | 423 +++++++++++++++++++ .../simulator/resources/SimResourceForwarder.kt | 17 +- .../simulator/resources/SimResourceProvider.kt | 19 +- .../simulator/resources/SimResourceSource.kt | 16 +- .../simulator/resources/SimResourceSwitch.kt | 14 +- .../resources/SimResourceSwitchExclusive.kt | 30 +- .../simulator/resources/SimResourceSwitchMaxMin.kt | 453 ++------------------- .../resources/consumer/SimConsumerBarrier.kt | 7 + .../resources/consumer/SimTraceConsumer.kt | 9 +- .../resources/consumer/SimWorkConsumer.kt | 11 +- .../resources/SimResourceAggregatorMaxMinTest.kt | 158 +++++++ .../simulator/resources/SimResourceContextTest.kt | 38 +- .../resources/SimResourceForwarderTest.kt | 50 +-- .../simulator/resources/SimResourceSourceTest.kt | 83 ++-- .../resources/SimResourceSwitchExclusiveTest.kt | 43 +- .../resources/SimResourceSwitchMaxMinTest.kt | 31 +- .../simulator/resources/SimWorkConsumerTest.kt | 13 +- 57 files changed, 1592 insertions(+), 1086 deletions(-) create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt delete mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt delete mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt delete mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt delete mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSourceBenchmark.kt delete mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSwitchBenchmark.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt delete mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 6e9b8151..694676bc 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -35,7 +35,7 @@ import org.opendc.compute.simulator.power.models.ConstantPowerModel import org.opendc.simulator.compute.* import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.failures.FailureDomain import org.opendc.utils.flow.EventFlow import java.time.Clock @@ -217,7 +217,7 @@ public class SimHost( val originalCpu = machine.model.cpus[0] val processingNode = originalCpu.node.copy(coreCount = cpuCount) val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } - val memoryUnits = listOf(SimMemoryUnit("Generic", "Generic", 3200.0, memorySize)) + val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) return SimMachineModel(processingUnits, memoryUnits) } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index d7a3b744..e311cd21 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -40,9 +40,9 @@ import org.opendc.compute.api.ServerWatcher import org.opendc.compute.service.driver.HostEvent import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter import java.time.Clock @@ -62,11 +62,11 @@ internal class SimHostTest { scope = TestCoroutineScope() clock = DelayControllerClockAdapter(scope) - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } @@ -136,8 +136,8 @@ internal class SimHostTest { assertAll( { assertEquals(emptyList(), scope.uncaughtExceptions, "No errors") }, - { assertEquals(4281600, requestedWork, "Requested work does not match") }, - { assertEquals(2241600, grantedWork, "Granted work does not match") }, + { assertEquals(4197600, requestedWork, "Requested work does not match") }, + { assertEquals(2157600, grantedWork, "Granted work does not match") }, { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, { assertEquals(1200006, scope.currentTime) } ) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 612509de..a812490a 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -143,10 +143,10 @@ class CapelinIntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1707144601723, monitor.totalRequestedBurst) }, - { assertEquals(457798297997, monitor.totalGrantedBurst) }, - { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) }, - { assertEquals(0, monitor.totalInterferedBurst) } + { assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, + { assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, + { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) } @@ -189,8 +189,8 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(711464339925, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(175226294127, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(702636229989, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(172807361391, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, { assertEquals(528959213229, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index 85a2e413..3da8d0b3 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -29,9 +29,9 @@ import org.opendc.compute.simulator.power.models.ConstantPowerModel import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import java.io.InputStream import java.util.* @@ -61,12 +61,12 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja val cores = machine.cpus.flatMap { id -> when (id) { 1 -> { - val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) - List(node.coreCount) { SimProcessingUnit(node, it, 4100.0) } + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) + List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } } 2 -> { - val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) - List(node.coreCount) { SimProcessingUnit(node, it, 3500.0) } + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) + List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } } else -> throw IllegalArgumentException("The cpu id $id is not recognized") } @@ -75,7 +75,7 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja UUID(0L, counter++.toLong()), "node-$counter", emptyMap(), - SimMachineModel(cores, listOf(SimMemoryUnit("", "", 2300.0, 16000))), + SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))), ConstantPowerModel(0.0) ) } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 094bc975..9a06a40f 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -26,9 +26,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import java.io.File import java.io.FileInputStream import java.io.InputStream @@ -88,8 +88,8 @@ public class Sc20ClusterEnvironmentReader( memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L coresPerHost = values[coresPerHostCol].trim().toInt() - val unknownProcessingNode = SimProcessingNode("unknown", "unknown", "unknown", coresPerHost) - val unknownMemoryUnit = SimMemoryUnit("unknown", "unknown", -1.0, memoryPerHost) + val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost) + val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) repeat(numberOfHosts) { nodes.add( @@ -99,7 +99,7 @@ public class Sc20ClusterEnvironmentReader( mapOf("cluster" to clusterId), SimMachineModel( List(coresPerHost) { coreId -> - SimProcessingUnit(unknownProcessingNode, coreId, speed) + ProcessingUnit(unknownProcessingNode, coreId, speed) }, listOf(unknownMemoryUnit) ), diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index 87a49f49..effd0286 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -29,9 +29,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import java.io.InputStream import java.util.* @@ -60,19 +60,19 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja val cores = machine.cpus.flatMap { id -> when (id) { 1 -> { - val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) - List(node.coreCount) { SimProcessingUnit(node, it, 4100.0) } + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) + List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } } 2 -> { - val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) - List(node.coreCount) { SimProcessingUnit(node, it, 3500.0) } + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) + List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } } else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } val memories = machine.memories.map { id -> when (id) { - 1 -> SimMemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L) + 1 -> MemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L) else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt index 0ff40a28..e7e99a3d 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt @@ -34,9 +34,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import java.util.* /** @@ -56,13 +56,13 @@ public class TopologyParser(private val collection: MongoCollection, p val cores = cpu.getInteger("numberOfCores") val speed = cpu.get("clockRateMhz", Number::class.java).toDouble() // TODO Remove hardcoding of vendor - val node = SimProcessingNode("Intel", "amd64", cpu.getString("name"), cores) + val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores) List(cores) { coreId -> - SimProcessingUnit(node, coreId, speed) + ProcessingUnit(node, coreId, speed) } } val memoryUnits = machine.getList("memories", Document::class.java).map { memory -> - SimMemoryUnit( + MemoryUnit( "Samsung", memory.getString("name"), memory.get("speedMbPerS", Number::class.java).toDouble(), diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt index a80365de..f68e206a 100644 --- a/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt +++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt @@ -35,9 +35,9 @@ import org.opendc.serverless.service.ServerlessService import org.opendc.serverless.service.router.RandomRoutingPolicy import org.opendc.serverless.simulator.workload.SimServerlessWorkload import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter @@ -52,11 +52,11 @@ internal class SimServerlessServiceTest { @BeforeEach fun setUp() { - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 1000.0) }, - memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt index 281d43ae..81d09f12 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -27,8 +27,8 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.launch import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* import java.time.Clock @@ -45,7 +45,7 @@ public abstract class SimAbstractHypervisor : SimHypervisor { /** * The resource switch to use. */ - private lateinit var switch: SimResourceSwitch + private lateinit var switch: SimResourceSwitch /** * The virtual machines running on this hypervisor. @@ -57,12 +57,12 @@ public abstract class SimAbstractHypervisor : SimHypervisor { /** * Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs. */ - public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch + public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch /** * Check whether the specified machine model fits on this hypervisor. */ - public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean + public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean override fun canFit(model: SimMachineModel): Boolean { return canFit(model, switch) @@ -101,7 +101,7 @@ public abstract class SimAbstractHypervisor : SimHypervisor { /** * The vCPUs of the machine. */ - private val cpus: Map> = model.cpus.associateWith { switch.addOutput(it) } + private val cpus: Map = model.cpus.associateWith { switch.addOutput(it.frequency) } /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. @@ -111,10 +111,10 @@ public abstract class SimAbstractHypervisor : SimHypervisor { require(!isTerminated) { "Machine is terminated" } val ctx = object : SimMachineContext { - override val cpus: List + override val cpus: List get() = model.cpus - override val memory: List + override val memory: List get() = model.memory override val clock: Clock @@ -122,8 +122,8 @@ public abstract class SimAbstractHypervisor : SimHypervisor { override val meta: Map = meta - override fun interrupt(resource: SimResource) { - requireNotNull(this@VirtualMachine.cpus[resource]).interrupt() + override fun interrupt(cpu: ProcessingUnit) { + requireNotNull(this@VirtualMachine.cpus[cpu]).interrupt() } } @@ -155,8 +155,8 @@ public abstract class SimAbstractHypervisor : SimHypervisor { switch = createSwitch(ctx) } - override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer { - val forwarder = SimResourceForwarder(cpu) + override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { + val forwarder = SimResourceForwarder() switch.addInput(forwarder) return forwarder } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 44906c2b..52945354 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -27,10 +27,9 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResource import org.opendc.simulator.resources.SimResourceProvider import org.opendc.simulator.resources.SimResourceSource import org.opendc.simulator.resources.consume @@ -65,24 +64,24 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine /** * The resources allocated for this machine. */ - protected abstract val resources: Map> + protected abstract val resources: Map /** * The execution context in which the workload runs. */ private inner class Context( - val sources: Map>, + val sources: Map, override val meta: Map ) : SimMachineContext { override val clock: Clock get() = this@SimAbstractMachine.clock - override val cpus: List = model.cpus + override val cpus: List = model.cpus - override val memory: List = model.memory + override val memory: List = model.memory - override fun interrupt(resource: SimResource) { - checkNotNull(sources[resource]) { "Invalid resource" }.interrupt() + override fun interrupt(cpu: ProcessingUnit) { + checkNotNull(sources[cpu]) { "Invalid resource" }.interrupt() } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 79982ea8..19479719 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.* import org.opendc.utils.TimerScheduler import java.time.Clock @@ -57,8 +57,8 @@ public class SimBareMetalMachine( */ private val scheduler = TimerScheduler(this.context, clock) - override val resources: Map> = - model.cpus.associateWith { SimResourceSource(it, clock, scheduler) } + override val resources: Map = + model.cpus.associateWith { SimResourceSource(it.frequency, clock, scheduler) } override fun close() { super.close() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index c629fbd9..fa677de9 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* @@ -34,14 +33,14 @@ import org.opendc.simulator.resources.* */ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() { - override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true + override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true - override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { + override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { return SimResourceSwitchMaxMin( ctx.clock, - object : SimResourceSwitchMaxMin.Listener { + object : SimResourceSwitchMaxMin.Listener { override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, + switch: SimResourceSwitchMaxMin, requestedWork: Long, grantedWork: Long, overcommittedWork: Long, diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt index cff70826..85404e6e 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt @@ -22,9 +22,8 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingUnit -import org.opendc.simulator.resources.SimResource +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingUnit import java.time.Clock /** @@ -46,17 +45,17 @@ public interface SimMachineContext { /** * The CPUs available on the machine. */ - public val cpus: List + public val cpus: List /** * The memory available on the machine */ - public val memory: List + public val memory: List /** - * Interrupt the specified [resource]. + * Interrupt the specified [cpu]. * * @throws IllegalArgumentException if the resource does not belong to this execution context. */ - public fun interrupt(resource: SimResource) + public fun interrupt(cpu: ProcessingUnit) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt index d6bf0e99..2b414540 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt @@ -22,8 +22,8 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingUnit /** * A description of the physical or virtual machine on which a bootable image runs. @@ -31,4 +31,4 @@ import org.opendc.simulator.compute.model.SimProcessingUnit * @property cpus The list of processing units available to the image. * @property memory The list of memory units available to the image. */ -public data class SimMachineModel(public val cpus: List, public val memory: List) +public data class SimMachineModel(public val cpus: List, public val memory: List) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index 5de69884..fd8e546f 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -22,18 +22,17 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.resources.* /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. */ public class SimSpaceSharedHypervisor : SimAbstractHypervisor() { - override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean { + override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean { return switch.inputs.size - switch.outputs.size >= model.cpus.size } - override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { + override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { return SimResourceSwitchExclusive() } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt new file mode 100644 index 00000000..bcbde5b1 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.model + +/** + * A memory unit of a compute resource, either virtual or physical. + * + * @property vendor The vendor string of the memory. + * @property modelName The name of the memory model. + * @property speed The access speed of the memory in MHz. + * @property size The size of the memory unit in MBs. + */ +public data class MemoryUnit( + public val vendor: String, + public val modelName: String, + public val speed: Double, + public val size: Long +) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt new file mode 100644 index 00000000..58ed816c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.model + +/** + * A processing node/package/socket containing possibly several CPU cores. + * + * @property vendor The vendor string of the processor node. + * @property modelName The name of the processor node. + * @property arch The micro-architecture of the processor node. + * @property coreCount The number of logical CPUs in the processor node. + */ +public data class ProcessingNode( + public val vendor: String, + public val arch: String, + public val modelName: String, + public val coreCount: Int +) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt new file mode 100644 index 00000000..415e95e6 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.model + +/** + * A single logical compute unit of processor node, either virtual or physical. + * + * @property node The processing node containing the CPU core. + * @property id The identifier of the CPU core within the processing node. + * @property frequency The clock rate of the CPU in MHz. + */ +public data class ProcessingUnit( + public val node: ProcessingNode, + public val id: Int, + public val frequency: Double +) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt deleted file mode 100644 index 49745868..00000000 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.model - -import org.opendc.simulator.resources.SimResource - -/** - * A memory unit of a compute resource, either virtual or physical. - * - * @property vendor The vendor string of the memory. - * @property modelName The name of the memory model. - * @property speed The access speed of the memory in MHz. - * @property size The size of the memory unit in MBs. - */ -public data class SimMemoryUnit( - public val vendor: String, - public val modelName: String, - public val speed: Double, - public val size: Long -) : SimResource { - override val capacity: Double - get() = speed -} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt deleted file mode 100644 index 4022ecb3..00000000 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.model - -/** - * A processing node/package/socket containing possibly several CPU cores. - * - * @property vendor The vendor string of the processor node. - * @property modelName The name of the processor node. - * @property arch The micro-architecture of the processor node. - * @property coreCount The number of logical CPUs in the processor node. - */ -public data class SimProcessingNode( - public val vendor: String, - public val arch: String, - public val modelName: String, - public val coreCount: Int -) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt deleted file mode 100644 index 1c989254..00000000 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.model - -import org.opendc.simulator.resources.SimResource - -/** - * A single logical compute unit of processor node, either virtual or physical. - * - * @property node The processing node containing the CPU core. - * @property id The identifier of the CPU core within the processing node. - * @property frequency The clock rate of the CPU in MHz. - */ -public data class SimProcessingUnit( - public val node: SimProcessingNode, - public val id: Int, - public val frequency: Double -) : SimResource { - override val capacity: Double - get() = frequency -} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index f1079ee6..63c9d28c 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.consumer.SimWorkConsumer @@ -45,7 +45,7 @@ public class SimFlopsWorkload( override fun onStart(ctx: SimMachineContext) {} - override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer { + override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { return SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index d7aa8f80..a3420e32 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.consumer.SimWorkConsumer @@ -44,7 +44,7 @@ public class SimRuntimeWorkload( override fun onStart(ctx: SimMachineContext) {} - override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer { + override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { val limit = cpu.frequency * utilization return SimWorkConsumer((limit / 1000) * duration, utilization) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index e8050263..2442d748 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -45,31 +45,29 @@ public class SimTraceWorkload(public val trace: Sequence) : SimWorkloa offset = ctx.clock.millis() } - override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer { - return CpuConsumer() - } + override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { + return object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + val now = ctx.clock.millis() + val fragment = fragment ?: return SimResourceCommand.Exit + val work = (fragment.duration / 1000) * fragment.usage + val deadline = offset + fragment.duration - private inner class CpuConsumer : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val now = ctx.clock.millis() - val fragment = fragment ?: return SimResourceCommand.Exit - val work = (fragment.duration / 1000) * fragment.usage - val deadline = offset + fragment.duration + assert(deadline >= now) { "Deadline already passed" } - assert(deadline >= now) { "Deadline already passed" } + val cmd = + if (cpu.id < fragment.cores && work > 0.0) + SimResourceCommand.Consume(work, fragment.usage, deadline) + else + SimResourceCommand.Idle(deadline) - val cmd = - if (ctx.resource.id < fragment.cores && work > 0.0) - SimResourceCommand.Consume(work, fragment.usage, deadline) - else - SimResourceCommand.Idle(deadline) + if (barrier.enter()) { + this@SimTraceWorkload.fragment = nextFragment() + this@SimTraceWorkload.offset += fragment.duration + } - if (barrier.enter()) { - this@SimTraceWorkload.fragment = nextFragment() - this@SimTraceWorkload.offset += fragment.duration + return cmd } - - return cmd } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index 60661e23..bdc12bb5 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceConsumer /** @@ -41,5 +41,5 @@ public interface SimWorkload { /** * Obtain the resource consumer for the specified processing unit. */ - public fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer + public fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index 4ac8cf63..0149024f 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -31,9 +31,9 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter @@ -46,10 +46,10 @@ internal class SimHypervisorTest { @BeforeEach fun setUp() { - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) model = SimMachineModel( - cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 6adc41d0..7e014245 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -28,9 +28,9 @@ import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter @@ -43,11 +43,11 @@ class SimMachineTest { @BeforeEach fun setUp() { - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 1000.0) }, - memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt index 8428a0a7..fb0523af 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt @@ -31,9 +31,9 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -48,10 +48,10 @@ internal class SimSpaceSharedHypervisorTest { @BeforeEach fun setUp() { - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt new file mode 100644 index 00000000..8d2587b1 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.opendc.simulator.resources.consumer.SimTraceConsumer + +/** + * Helper function to create simple consumer workload. + */ +fun createSimpleConsumer(): SimResourceConsumer { + return SimTraceConsumer( + sequenceOf( + SimTraceConsumer.Fragment(1000, 28.0), + SimTraceConsumer.Fragment(1000, 3500.0), + SimTraceConsumer.Fragment(1000, 0.0), + SimTraceConsumer.Fragment(1000, 183.0), + SimTraceConsumer.Fragment(1000, 400.0), + SimTraceConsumer.Fragment(1000, 100.0), + SimTraceConsumer.Fragment(1000, 3000.0), + SimTraceConsumer.Fragment(1000, 4500.0), + ), + ) +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt new file mode 100644 index 00000000..f2eea97c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler +import org.openjdk.jmh.annotations.* +import java.time.Clock +import java.util.concurrent.TimeUnit + +@State(Scope.Thread) +@Fork(1) +@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +@OptIn(ExperimentalCoroutinesApi::class) +class SimResourceBenchmarks { + private lateinit var scope: TestCoroutineScope + private lateinit var clock: Clock + private lateinit var scheduler: TimerScheduler + + @Setup + fun setUp() { + scope = TestCoroutineScope() + clock = DelayControllerClockAdapter(scope) + scheduler = TimerScheduler(scope.coroutineContext, clock) + } + + @State(Scope.Thread) + class Workload { + lateinit var consumers: Array + + @Setup + fun setUp() { + consumers = Array(3) { createSimpleConsumer() } + } + } + + @Benchmark + fun benchmarkSource(state: Workload) { + return scope.runBlockingTest { + val provider = SimResourceSource(4200.0, clock, scheduler) + return@runBlockingTest provider.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkForwardOverhead(state: Workload) { + return scope.runBlockingTest { + val provider = SimResourceSource(4200.0, clock, scheduler) + val forwarder = SimResourceForwarder() + provider.startConsumer(forwarder) + return@runBlockingTest forwarder.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) { + return scope.runBlockingTest { + val switch = SimResourceSwitchMaxMin(clock) + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + val provider = switch.addOutput(3500.0) + return@runBlockingTest provider.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) { + return scope.runBlockingTest { + val switch = SimResourceSwitchMaxMin(clock) + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + repeat(3) { i -> + launch { + val provider = switch.addOutput(3500.0) + provider.consume(state.consumers[i]) + } + } + } + } + + @Benchmark + fun benchmarkSwitchExclusiveSingleConsumer(state: Workload) { + return scope.runBlockingTest { + val switch = SimResourceSwitchExclusive() + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + val provider = switch.addOutput(3500.0) + return@runBlockingTest provider.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkSwitchExclusiveTripleConsumer(state: Workload) { + return scope.runBlockingTest { + val switch = SimResourceSwitchExclusive() + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + repeat(2) { i -> + launch { + val provider = switch.addOutput(3500.0) + provider.consume(state.consumers[i]) + } + } + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSourceBenchmark.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSourceBenchmark.kt deleted file mode 100644 index 09246fe4..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSourceBenchmark.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.test.runBlockingTest -import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.utils.TimerScheduler -import org.openjdk.jmh.annotations.* -import java.time.Clock -import java.util.concurrent.TimeUnit - -@State(Scope.Benchmark) -@Fork(1) -@Warmup(iterations = 0) -@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) -@OptIn(ExperimentalCoroutinesApi::class) -class SimResourceSourceBenchmark { - private lateinit var scope: TestCoroutineScope - private lateinit var clock: Clock - private lateinit var scheduler: TimerScheduler - private lateinit var consumer: SimResourceConsumer - - @Setup - fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) - scheduler = TimerScheduler(scope.coroutineContext, clock) - consumer = - SimTraceConsumer( - sequenceOf( - SimTraceConsumer.Fragment(1000, 28.0), - SimTraceConsumer.Fragment(1000, 3500.0), - SimTraceConsumer.Fragment(1000, 0.0), - SimTraceConsumer.Fragment(1000, 183.0), - SimTraceConsumer.Fragment(1000, 400.0), - SimTraceConsumer.Fragment(1000, 100.0), - SimTraceConsumer.Fragment(1000, 3000.0), - SimTraceConsumer.Fragment(1000, 4500.0), - ), - ) - } - - @Benchmark - fun benchmarkSource() { - return scope.runBlockingTest { - val provider = SimResourceSource(SimGenericResource(4200.0), clock, scheduler) - return@runBlockingTest provider.consume(consumer) - } - } - - data class SimGenericResource(override val capacity: Double) : SimResource -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSwitchBenchmark.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSwitchBenchmark.kt deleted file mode 100644 index be31e86d..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSwitchBenchmark.kt +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.test.runBlockingTest -import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.utils.TimerScheduler -import org.openjdk.jmh.annotations.* -import java.time.Clock -import java.util.concurrent.TimeUnit - -@State(Scope.Benchmark) -@Fork(1) -@Warmup(iterations = 0) -@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) -@OptIn(ExperimentalCoroutinesApi::class) -class SimResourceSwitchBenchmark { - private lateinit var scope: TestCoroutineScope - private lateinit var clock: Clock - private lateinit var scheduler: TimerScheduler - private lateinit var consumer: SimResourceConsumer - - @Setup - fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) - scheduler = TimerScheduler(scope.coroutineContext, clock) - consumer = - SimTraceConsumer( - sequenceOf( - SimTraceConsumer.Fragment(1000, 28.0), - SimTraceConsumer.Fragment(1000, 3500.0), - SimTraceConsumer.Fragment(1000, 0.0), - SimTraceConsumer.Fragment(1000, 183.0) - ), - ) - } - - @Benchmark - fun benchmarkSwitch() { - return scope.runBlockingTest { - val switch = SimResourceSwitchMaxMin(clock) - - switch.addInput(SimResourceSource(SimGenericResource(3000.0), clock, scheduler)) - switch.addInput(SimResourceSource(SimGenericResource(3000.0), clock, scheduler)) - - val provider = switch.addOutput(SimGenericResource(3500.0)) - return@runBlockingTest provider.consume(consumer) - } - } - - data class SimGenericResource(override val capacity: Double) : SimResource -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt new file mode 100644 index 00000000..18ac0cd8 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * Abstract implementation of [SimResourceAggregator]. + */ +public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator { + /** + * The available resource provider contexts. + */ + protected val inputContexts: Set + get() = _inputContexts + private val _inputContexts = mutableSetOf() + + /** + * The output context. + */ + protected val outputContext: SimResourceContext + get() = context + + /** + * The commands to submit to the underlying input resources. + */ + protected val commands: MutableMap = mutableMapOf() + + /** + * This method is invoked when the resource consumer consumes resources. + */ + protected abstract fun doConsume(work: Double, limit: Double, deadline: Long) + + /** + * This method is invoked when the resource consumer enters an idle state. + */ + protected open fun doIdle(deadline: Long) { + for (input in inputContexts) { + commands[input] = SimResourceCommand.Idle(deadline) + } + } + + /** + * This method is invoked when the resource consumer finishes processing. + */ + protected open fun doFinish(cause: Throwable?) { + for (input in inputContexts) { + commands[input] = SimResourceCommand.Exit + } + } + + /** + * This method is invoked when an input context is started. + */ + protected open fun onContextStarted(ctx: SimResourceContext) { + _inputContexts.add(ctx) + } + + protected open fun onContextFinished(ctx: SimResourceContext) { + assert(_inputContexts.remove(ctx)) { "Lost context" } + } + + override fun addInput(input: SimResourceProvider) { + check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" } + + val consumer = Consumer() + _inputs.add(input) + input.startConsumer(consumer) + } + + override fun close() { + output.close() + } + + override val output: SimResourceProvider + get() = _output + private val _output = SimResourceForwarder() + + override val inputs: Set + get() = _inputs + private val _inputs = mutableSetOf() + + private val context = object : SimAbstractResourceContext(clock, _output) { + override val capacity: Double + get() = inputContexts.sumByDouble { it.capacity } + + override val remainingWork: Double + get() = inputContexts.sumByDouble { it.remainingWork } + + override fun interrupt() { + super.interrupt() + + interruptAll() + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + doConsume(work, limit, deadline) + } + + override fun onIdle(deadline: Long) { + doIdle(deadline) + } + + override fun onFinish(cause: Throwable?) { + doFinish(cause) + + super.onFinish(cause) + + interruptAll() + } + } + + /** + * A flag to indicate that an interrupt is active. + */ + private var isInterrupting: Boolean = false + + /** + * Schedule the work over the input resources. + */ + private fun doSchedule() { + context.flush(isIntermediate = true) + interruptAll() + } + + /** + * Interrupt all inputs. + */ + private fun interruptAll() { + // Prevent users from interrupting the resource while they are constructing their next command, as this will + // only lead to infinite recursion. + if (isInterrupting) { + return + } + + try { + isInterrupting = true + + val iterator = _inputs.iterator() + while (iterator.hasNext()) { + val input = iterator.next() + input.interrupt() + + if (input.state != SimResourceState.Active) { + iterator.remove() + } + } + } finally { + isInterrupting = false + } + } + + /** + * An internal [SimResourceConsumer] implementation for aggregator inputs. + */ + private inner class Consumer : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext) { + onContextStarted(ctx) + + // Make sure we initialize the output if we have not done so yet + if (context.state == SimResourceState.Pending) { + context.start() + } + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + doSchedule() + + return commands[ctx] ?: SimResourceCommand.Idle() + } + + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + onContextFinished(ctx) + + super.onFinish(ctx, cause) + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index dba334a2..f65cbaf4 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -30,17 +30,10 @@ import kotlin.math.min /** * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. */ -public abstract class SimAbstractResourceContext( - override val resource: R, +public abstract class SimAbstractResourceContext( override val clock: Clock, - private val consumer: SimResourceConsumer -) : SimResourceContext { - /** - * The capacity of the resource. - */ - override val capacity: Double - get() = resource.capacity - + private val consumer: SimResourceConsumer +) : SimResourceContext { /** * The amount of work still remaining at this instant. */ @@ -50,6 +43,12 @@ public abstract class SimAbstractResourceContext( return computeRemainingWork(activeCommand, clock.millis()) } + /** + * A flag to indicate the state of the context. + */ + public var state: SimResourceState = SimResourceState.Pending + private set + /** * This method is invoked when the resource will idle until the specified [deadline]. */ @@ -178,7 +177,7 @@ public abstract class SimAbstractResourceContext( if (command.deadline <= now || !isIntermediate) { next(now) } else { - activeCommand + interpret(SimResourceCommand.Idle(command.deadline), now) } } is SimResourceCommand.Consume -> { @@ -214,18 +213,13 @@ public abstract class SimAbstractResourceContext( flush() } - override fun toString(): String = "SimAbstractResourceContext[resource=$resource]" + override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]" /** * A flag to indicate that the resource is currently processing a command. */ protected var isProcessing: Boolean = false - /** - * A flag to indicate the state of the context. - */ - private var state: SimResourceState = SimResourceState.Pending - /** * The current command that is being processed. */ diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt deleted file mode 100644 index 31b0a175..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * A generic representation of resource that may be consumed. - */ -public interface SimResource { - /** - * The capacity of the resource. - */ - public val capacity: Double -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt new file mode 100644 index 00000000..bb4e6a2c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource. + */ +public interface SimResourceAggregator : AutoCloseable { + /** + * The output resource provider to which resource consumers can be attached. + */ + public val output: SimResourceProvider + + /** + * The input resources that will be switched between the output providers. + */ + public val inputs: Set + + /** + * Add the specified [input] to the switch. + */ + public fun addInput(input: SimResourceProvider) + + /** + * End the lifecycle of the aggregator. + */ + public override fun close() +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt new file mode 100644 index 00000000..08bc064e --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * A [SimResourceAggregator] that distributes the load equally across the input resources. + */ +public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) { + private val consumers = mutableListOf() + + override fun doConsume(work: Double, limit: Double, deadline: Long) { + // Sort all consumers by their capacity + consumers.sortWith(compareBy { it.capacity }) + + // Divide the requests over the available capacity of the input resources fairly + for (input in consumers) { + val inputCapacity = input.capacity + val fraction = inputCapacity / outputContext.capacity + val grantedSpeed = limit * fraction + val grantedWork = fraction * work + + commands[input] = + if (grantedWork > 0.0 && grantedSpeed > 0.0) + SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + } + } + + override fun onContextStarted(ctx: SimResourceContext) { + super.onContextStarted(ctx) + + consumers.add(ctx) + } + + override fun onContextFinished(ctx: SimResourceContext) { + super.onContextFinished(ctx) + + consumers.remove(ctx) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 01b56488..04c7fcaf 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -28,13 +28,13 @@ package org.opendc.simulator.resources * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently) * for multiple resource providers, unless explicitly said otherwise. */ -public interface SimResourceConsumer { +public interface SimResourceConsumer { /** * This method is invoked when the consumer is started for some resource. * * @param ctx The execution context in which the consumer runs. */ - public fun onStart(ctx: SimResourceContext) {} + public fun onStart(ctx: SimResourceContext) {} /** * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because @@ -43,7 +43,7 @@ public interface SimResourceConsumer { * @param ctx The execution context in which the consumer runs. * @return The next command that the resource should execute. */ - public fun onNext(ctx: SimResourceContext): SimResourceCommand + public fun onNext(ctx: SimResourceContext): SimResourceCommand /** * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], @@ -54,5 +54,5 @@ public interface SimResourceConsumer { * @param ctx The execution context in which the consumer ran. * @param cause The cause of the finish in case the resource finished exceptionally. */ - public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {} + public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {} } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index f13764fb..11dbb09f 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -28,12 +28,7 @@ import java.time.Clock * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a * resource and a resource consumer. */ -public interface SimResourceContext { - /** - * The resource that is managed by this context. - */ - public val resource: R - +public interface SimResourceContext { /** * The virtual clock tracking simulation time. */ diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt new file mode 100644 index 00000000..b2759b7f --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers. + */ +public interface SimResourceDistributor : AutoCloseable { + /** + * The output resource providers to which resource consumers can be attached. + */ + public val outputs: Set + + /** + * The input resource that will be distributed over the consumers. + */ + public val input: SimResourceProvider + + /** + * Add an output to the switch with the specified [capacity]. + */ + public fun addOutput(capacity: Double): SimResourceProvider +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt new file mode 100644 index 00000000..b0f27b9d --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -0,0 +1,423 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock +import kotlin.math.max +import kotlin.math.min + +/** + * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing. + */ +public class SimResourceDistributorMaxMin( + override val input: SimResourceProvider, + private val clock: Clock, + private val listener: Listener? = null +) : SimResourceDistributor { + override val outputs: Set + get() = _outputs + private val _outputs = mutableSetOf() + + /** + * The active output contexts. + */ + private val outputContexts: MutableList = mutableListOf() + + /** + * The total speed requested by the output resources. + */ + private var totalRequestedSpeed = 0.0 + + /** + * The total amount of work requested by the output resources. + */ + private var totalRequestedWork = 0.0 + + /** + * The total allocated speed for the output resources. + */ + private var totalAllocatedSpeed = 0.0 + + /** + * The total allocated work requested for the output resources. + */ + private var totalAllocatedWork = 0.0 + + /** + * The amount of work that could not be performed due to over-committing resources. + */ + private var totalOvercommittedWork = 0.0 + + /** + * The amount of work that was lost due to interference. + */ + private var totalInterferedWork = 0.0 + + /** + * A flag to indicate that the switch is closed. + */ + private var isClosed: Boolean = false + + /** + * An internal [SimResourceConsumer] implementation for switch inputs. + */ + private val consumer = object : SimResourceConsumer { + /** + * The resource context of the consumer. + */ + private lateinit var ctx: SimResourceContext + + val remainingWork: Double + get() = ctx.remainingWork + + override fun onStart(ctx: SimResourceContext) { + this.ctx = ctx + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return doNext(ctx.capacity) + } + + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + super.onFinish(ctx, cause) + + val iterator = _outputs.iterator() + while (iterator.hasNext()) { + val output = iterator.next() + + // Remove the output from the outputs to prevent ConcurrentModificationException when removing it + // during the call tou output.close() + iterator.remove() + + output.close() + } + } + } + + /** + * The total amount of remaining work. + */ + private val totalRemainingWork: Double + get() = consumer.remainingWork + + override fun addOutput(capacity: Double): SimResourceProvider { + check(!isClosed) { "Distributor has been closed" } + + val provider = OutputProvider(capacity) + _outputs.add(provider) + return provider + } + + override fun close() { + if (!isClosed) { + isClosed = true + input.cancel() + } + } + + init { + input.startConsumer(consumer) + } + + /** + * Indicate that the workloads should be re-scheduled. + */ + private fun schedule() { + input.interrupt() + } + + /** + * Schedule the work over the physical CPUs. + */ + private fun doSchedule(capacity: Double): SimResourceCommand { + // If there is no work yet, mark all inputs as idle. + if (outputContexts.isEmpty()) { + return SimResourceCommand.Idle() + } + + val maxUsage = capacity + var duration: Double = Double.MAX_VALUE + var deadline: Long = Long.MAX_VALUE + var availableSpeed = maxUsage + var totalRequestedSpeed = 0.0 + var totalRequestedWork = 0.0 + + // Flush the work of the outputs + var outputIterator = outputContexts.listIterator() + while (outputIterator.hasNext()) { + val output = outputIterator.next() + + output.flush(isIntermediate = true) + + if (output.activeCommand == SimResourceCommand.Exit) { + // Apparently the output consumer has exited, so remove it from the scheduling queue. + outputIterator.remove() + } + } + + // Sort the outputs based on their requested usage + // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + outputContexts.sort() + + // Divide the available input capacity fairly across the outputs using max-min fair sharing + outputIterator = outputContexts.listIterator() + var remaining = outputContexts.size + while (outputIterator.hasNext()) { + val output = outputIterator.next() + val availableShare = availableSpeed / remaining-- + + when (val command = output.activeCommand) { + is SimResourceCommand.Idle -> { + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + output.actualSpeed = 0.0 + } + is SimResourceCommand.Consume -> { + val grantedSpeed = min(output.allowedSpeed, availableShare) + + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + // Ignore idle computation + if (grantedSpeed <= 0.0 || command.work <= 0.0) { + output.actualSpeed = 0.0 + continue + } + + totalRequestedSpeed += command.limit + totalRequestedWork += command.work + + output.actualSpeed = grantedSpeed + availableSpeed -= grantedSpeed + + // The duration that we want to run is that of the shortest request from an output + duration = min(duration, command.work / grantedSpeed) + } + SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" } + } + } + + assert(deadline >= clock.millis()) { "Deadline already passed" } + + this.totalRequestedSpeed = totalRequestedSpeed + this.totalRequestedWork = totalRequestedWork + this.totalAllocatedSpeed = maxUsage - availableSpeed + this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration) + + return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) + SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + } + + /** + * Obtain the next command to perform. + */ + private fun doNext(capacity: Double): SimResourceCommand { + val totalRequestedWork = totalRequestedWork.toLong() + val totalRemainingWork = totalRemainingWork.toLong() + val totalAllocatedWork = totalAllocatedWork.toLong() + val totalRequestedSpeed = totalRequestedSpeed + val totalAllocatedSpeed = totalAllocatedSpeed + + // Force all inputs to re-schedule their work. + val command = doSchedule(capacity) + + // Report metrics + listener?.onSliceFinish( + this, + totalRequestedWork, + totalAllocatedWork - totalRemainingWork, + totalOvercommittedWork.toLong(), + totalInterferedWork.toLong(), + totalRequestedSpeed, + totalAllocatedSpeed, + ) + + totalInterferedWork = 0.0 + totalOvercommittedWork = 0.0 + + return command + } + + /** + * Event listener for hypervisor events. + */ + public interface Listener { + /** + * This method is invoked when a slice is finished. + */ + public fun onSliceFinish( + switch: SimResourceDistributor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) + } + + /** + * An internal [SimResourceProvider] implementation for switch outputs. + */ + private inner class OutputProvider(val capacity: Double) : SimResourceProvider { + /** + * The [OutputContext] that is currently running. + */ + private var ctx: OutputContext? = null + + override var state: SimResourceState = SimResourceState.Pending + internal set + + override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource cannot be consumed" } + + val ctx = OutputContext(this, consumer) + this.ctx = ctx + this.state = SimResourceState.Active + outputContexts += ctx + + ctx.start() + schedule() + } + + override fun close() { + cancel() + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Stopped + _outputs.remove(this) + } + } + + override fun interrupt() { + ctx?.interrupt() + } + + override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.stop() + } + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } + } + + /** + * A [SimAbstractResourceContext] for the output resources. + */ + private inner class OutputContext( + private val provider: OutputProvider, + consumer: SimResourceConsumer + ) : SimAbstractResourceContext(clock, consumer), Comparable { + override val capacity: Double + get() = provider.capacity + + /** + * The current command that is processed by the vCPU. + */ + var activeCommand: SimResourceCommand = SimResourceCommand.Idle() + + /** + * The processing speed that is allowed by the model constraints. + */ + var allowedSpeed: Double = 0.0 + + /** + * The actual processing speed. + */ + var actualSpeed: Double = 0.0 + + private fun reportOvercommit() { + val remainingWork = remainingWork + totalOvercommittedWork += remainingWork + } + + override fun onIdle(deadline: Long) { + reportOvercommit() + + allowedSpeed = 0.0 + activeCommand = SimResourceCommand.Idle(deadline) + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + reportOvercommit() + + allowedSpeed = getSpeed(limit) + activeCommand = SimResourceCommand.Consume(work, limit, deadline) + } + + override fun onFinish(cause: Throwable?) { + reportOvercommit() + + activeCommand = SimResourceCommand.Exit + provider.cancel() + + super.onFinish(cause) + } + + override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { + // Apply performance interference model + val performanceScore = 1.0 + + // Compute the remaining amount of work + return if (work > 0.0) { + // Compute the fraction of compute time allocated to the VM + val fraction = actualSpeed / totalAllocatedSpeed + + // Compute the work that was actually granted to the VM. + val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction + val processed = processingAvailable * performanceScore + + val interferedWork = processingAvailable - processed + + totalInterferedWork += interferedWork + + max(0.0, work - processed) + } else { + 0.0 + } + } + + override fun interrupt() { + // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead + // to infinite recursion. + if (isProcessing) { + return + } + + super.interrupt() + + // Force the scheduler to re-schedule + schedule() + } + + override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt index 732e709a..227f4d62 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -25,17 +25,16 @@ package org.opendc.simulator.resources /** * A helper class to construct a [SimResourceProvider] which forwards the requests to a [SimResourceConsumer]. */ -public class SimResourceForwarder(override val resource: R) : - SimResourceProvider, SimResourceConsumer { +public class SimResourceForwarder : SimResourceProvider, SimResourceConsumer { /** * The [SimResourceContext] in which the forwarder runs. */ - private var ctx: SimResourceContext? = null + private var ctx: SimResourceContext? = null /** * The delegate [SimResourceConsumer]. */ - private var delegate: SimResourceConsumer? = null + private var delegate: SimResourceConsumer? = null /** * A flag to indicate that the delegate was started. @@ -48,11 +47,13 @@ public class SimResourceForwarder(override val resource: R) : override var state: SimResourceState = SimResourceState.Pending private set - override fun startConsumer(consumer: SimResourceConsumer) { + override fun startConsumer(consumer: SimResourceConsumer) { check(state == SimResourceState.Pending) { "Resource is in invalid state" } state = SimResourceState.Active delegate = consumer + + // Interrupt the provider to replace the consumer interrupt() } @@ -83,11 +84,11 @@ public class SimResourceForwarder(override val resource: R) : } } - override fun onStart(ctx: SimResourceContext) { + override fun onStart(ctx: SimResourceContext) { this.ctx = ctx } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { val delegate = delegate if (!hasDelegateStarted) { @@ -117,7 +118,7 @@ public class SimResourceForwarder(override val resource: R) : } } - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { this.ctx = null val delegate = delegate diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index 1593281b..52b13c5c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -27,12 +27,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine /** * A [SimResourceProvider] provides some resource of type [R]. */ -public interface SimResourceProvider : AutoCloseable { - /** - * The resource that is managed by this provider. - */ - public val resource: R - +public interface SimResourceProvider : AutoCloseable { /** * The state of the resource. */ @@ -43,7 +38,7 @@ public interface SimResourceProvider : AutoCloseable { * * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended. */ - public fun startConsumer(consumer: SimResourceConsumer) + public fun startConsumer(consumer: SimResourceConsumer) /** * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op. @@ -67,15 +62,15 @@ public interface SimResourceProvider : AutoCloseable { * Consume the resource provided by this provider using the specified [consumer] and suspend execution until * the consumer has finished. */ -public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) { +public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) { return suspendCancellableCoroutine { cont -> - startConsumer(object : SimResourceConsumer by consumer { - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + startConsumer(object : SimResourceConsumer by consumer { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { assert(!cont.isCompleted) { "Coroutine already completed" } - cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit)) - consumer.onFinish(ctx, cause) + + cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit)) } override fun toString(): String = "SimSuspendingResourceConsumer" diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 99545c4c..3b4e7e7a 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -31,15 +31,15 @@ import kotlin.math.min /** * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity. * - * @param resource The resource to provide. + * @param initialCapacity The initial capacity of the resource. * @param clock The virtual clock to track simulation time. * @param scheduler The scheduler to schedule the interrupts. */ -public class SimResourceSource( - override val resource: R, +public class SimResourceSource( + private val initialCapacity: Double, private val clock: Clock, private val scheduler: TimerScheduler -) : SimResourceProvider { +) : SimResourceProvider { /** * The resource processing speed over time. */ @@ -55,7 +55,7 @@ public class SimResourceSource( override var state: SimResourceState = SimResourceState.Pending private set - override fun startConsumer(consumer: SimResourceConsumer) { + override fun startConsumer(consumer: SimResourceConsumer) { check(state == SimResourceState.Pending) { "Resource is in invalid state" } val ctx = Context(consumer) @@ -89,7 +89,9 @@ public class SimResourceSource( /** * Internal implementation of [SimResourceContext] for this class. */ - private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(resource, clock, consumer) { + private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(clock, consumer) { + override val capacity: Double = initialCapacity + /** * The processing speed of the resource. */ @@ -123,6 +125,6 @@ public class SimResourceSource( super.onFinish(cause) } - override fun toString(): String = "SimResourceSource.Context[resource=$resource]" + override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt index cd1af3fc..53fec16a 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -23,26 +23,26 @@ package org.opendc.simulator.resources /** - * A [SimResourceSwitch] enables switching of capacity of multiple resources of type [R] between multiple consumers. + * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers. */ -public interface SimResourceSwitch : AutoCloseable { +public interface SimResourceSwitch : AutoCloseable { /** * The output resource providers to which resource consumers can be attached. */ - public val outputs: Set> + public val outputs: Set /** * The input resources that will be switched between the output providers. */ - public val inputs: Set> + public val inputs: Set /** - * Add an output to the switch represented by [resource]. + * Add an output to the switch with the specified [capacity]. */ - public fun addOutput(resource: R): SimResourceProvider + public fun addOutput(capacity: Double): SimResourceProvider /** * Add the specified [input] to the switch. */ - public fun addInput(input: SimResourceProvider) + public fun addInput(input: SimResourceProvider) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 5eea78f6..6e431ea1 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -28,45 +28,45 @@ import java.util.ArrayDeque * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that * a single output is directly connected to an input and that the switch can only support as much outputs as inputs. */ -public class SimResourceSwitchExclusive : SimResourceSwitch { +public class SimResourceSwitchExclusive : SimResourceSwitch { /** * A flag to indicate that the switch is closed. */ private var isClosed: Boolean = false private val _outputs = mutableSetOf() - override val outputs: Set> + override val outputs: Set get() = _outputs - private val availableResources = ArrayDeque>() + private val availableResources = ArrayDeque() - private val _inputs = mutableSetOf>() - override val inputs: Set> + private val _inputs = mutableSetOf() + override val inputs: Set get() = _inputs - override fun addOutput(resource: R): SimResourceProvider { + override fun addOutput(capacity: Double): SimResourceProvider { check(!isClosed) { "Switch has been closed" } check(availableResources.isNotEmpty()) { "No capacity to serve request" } val forwarder = availableResources.poll() - val output = Provider(resource, forwarder) + val output = Provider(capacity, forwarder) _outputs += output return output } - override fun addInput(input: SimResourceProvider) { + override fun addInput(input: SimResourceProvider) { check(!isClosed) { "Switch has been closed" } if (input in inputs) { return } - val forwarder = SimResourceForwarder(input.resource) + val forwarder = SimResourceForwarder() _inputs += input availableResources += forwarder - input.startConsumer(object : SimResourceConsumer by forwarder { - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + input.startConsumer(object : SimResourceConsumer by forwarder { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { // De-register the input after it has finished _inputs -= input forwarder.onFinish(ctx, cause) @@ -78,13 +78,13 @@ public class SimResourceSwitchExclusive : SimResourceSwitch isClosed = true // Cancel all upstream subscriptions - _inputs.forEach(SimResourceProvider::cancel) + _inputs.forEach(SimResourceProvider::cancel) } private inner class Provider( - override val resource: R, - private val forwarder: SimResourceForwarder - ) : SimResourceProvider by forwarder { + private val capacity: Double, + private val forwarder: SimResourceForwarder + ) : SimResourceProvider by forwarder { override fun close() { _outputs -= this availableResources += forwarder diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index ee8edfcd..c796c251 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -23,97 +23,61 @@ package org.opendc.simulator.resources import kotlinx.coroutines.* -import org.opendc.simulator.resources.consumer.SimConsumerBarrier import java.time.Clock -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min /** * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min * fair sharing. */ -public class SimResourceSwitchMaxMin( - private val clock: Clock, - private val listener: Listener? = null -) : SimResourceSwitch { - private val inputConsumers = mutableSetOf() - private val _outputs = mutableSetOf() - override val outputs: Set> +public class SimResourceSwitchMaxMin( + clock: Clock, + private val listener: Listener? = null +) : SimResourceSwitch { + private val _outputs = mutableSetOf() + override val outputs: Set get() = _outputs - private val _inputs = mutableSetOf>() - override val inputs: Set> + private val _inputs = mutableSetOf() + override val inputs: Set get() = _inputs /** - * The commands to submit to the underlying host. + * A flag to indicate that the switch was closed. */ - private val commands = mutableMapOf() + private var isClosed = false /** - * The active output contexts. + * The aggregator to aggregate the resources. */ - private val outputContexts: MutableList = mutableListOf() + private val aggregator = SimResourceAggregatorMaxMin(clock) /** - * The remaining work of all inputs. + * The distributor to distribute the aggregated resources. */ - private val totalRemainingWork: Double - get() = inputConsumers.sumByDouble { it.remainingWork } - - /** - * The total speed requested by the vCPUs. - */ - private var totalRequestedSpeed = 0.0 - - /** - * The total amount of work requested by the vCPUs. - */ - private var totalRequestedWork = 0.0 - - /** - * The total allocated speed for the vCPUs. - */ - private var totalAllocatedSpeed = 0.0 - - /** - * The total allocated work requested for the vCPUs. - */ - private var totalAllocatedWork = 0.0 - - /** - * The amount of work that could not be performed due to over-committing resources. - */ - private var totalOvercommittedWork = 0.0 - - /** - * The amount of work that was lost due to interference. - */ - private var totalInterferedWork = 0.0 - - /** - * A flag to indicate that the scheduler has submitted work that has not yet been completed. - */ - private var isDirty: Boolean = false - - /** - * The scheduler barrier. - */ - private var barrier: SimConsumerBarrier = SimConsumerBarrier(0) - - /** - * A flag to indicate that the switch is closed. - */ - private var isClosed: Boolean = false + private val distributor = SimResourceDistributorMaxMin( + aggregator.output, clock, + object : SimResourceDistributorMaxMin.Listener { + override fun onSliceFinish( + switch: SimResourceDistributor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) + } + } + ) /** * Add an output to the switch represented by [resource]. */ - override fun addOutput(resource: R): SimResourceProvider { + override fun addOutput(capacity: Double): SimResourceProvider { check(!isClosed) { "Switch has been closed" } - val provider = OutputProvider(resource) + val provider = distributor.addOutput(capacity) _outputs.add(provider) return provider } @@ -121,169 +85,29 @@ public class SimResourceSwitchMaxMin( /** * Add the specified [input] to the switch. */ - override fun addInput(input: SimResourceProvider) { + override fun addInput(input: SimResourceProvider) { check(!isClosed) { "Switch has been closed" } - val consumer = InputConsumer(input) - _inputs.add(input) - inputConsumers += consumer + aggregator.addInput(input) } override fun close() { - isClosed = true - } - - /** - * Indicate that the workloads should be re-scheduled. - */ - private fun schedule() { - isDirty = true - interruptAll() - } - - /** - * Schedule the work over the physical CPUs. - */ - private fun doSchedule() { - // If there is no work yet, mark all inputs as idle. - if (outputContexts.isEmpty()) { - commands.replaceAll { _, _ -> SimResourceCommand.Idle() } - interruptAll() - } - - val maxUsage = inputs.sumByDouble { it.resource.capacity } - var duration: Double = Double.MAX_VALUE - var deadline: Long = Long.MAX_VALUE - var availableSpeed = maxUsage - var totalRequestedSpeed = 0.0 - var totalRequestedWork = 0.0 - - // Sort the outputs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set - outputContexts.sort() - - // Divide the available input capacity fairly across the outputs using max-min fair sharing - val outputIterator = outputContexts.listIterator() - var remaining = outputContexts.size - while (outputIterator.hasNext()) { - val output = outputIterator.next() - val availableShare = availableSpeed / remaining-- - - when (val command = output.activeCommand) { - is SimResourceCommand.Idle -> { - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, command.deadline) - - output.actualSpeed = 0.0 - } - is SimResourceCommand.Consume -> { - val grantedSpeed = min(output.allowedSpeed, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, command.deadline) - - // Ignore idle computation - if (grantedSpeed <= 0.0 || command.work <= 0.0) { - output.actualSpeed = 0.0 - continue - } - - totalRequestedSpeed += command.limit - totalRequestedWork += command.work - - output.actualSpeed = grantedSpeed - availableSpeed -= grantedSpeed - - // The duration that we want to run is that of the shortest request from an output - duration = min(duration, command.work / grantedSpeed) - } - SimResourceCommand.Exit -> { - // Apparently the output consumer has exited, so remove it from the scheduling queue. - outputIterator.remove() - } - } - } - - // Round the duration to milliseconds - duration = ceil(duration * 1000) / 1000 - - assert(deadline >= clock.millis()) { "Deadline already passed" } - - val totalAllocatedSpeed = maxUsage - availableSpeed - var totalAllocatedWork = 0.0 - availableSpeed = totalAllocatedSpeed - - // Divide the requests over the available capacity of the input resources fairly - for (input in inputs.sortedByDescending { it.resource.capacity }) { - val maxResourceUsage = input.resource.capacity - val fraction = maxResourceUsage / maxUsage - val grantedSpeed = min(maxResourceUsage, totalAllocatedSpeed * fraction) - val grantedWork = duration * grantedSpeed - - commands[input.resource] = - if (grantedWork > 0.0 && grantedSpeed > 0.0) - SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) - else - SimResourceCommand.Idle(deadline) - - totalAllocatedWork += grantedWork - availableSpeed -= grantedSpeed - } - - this.totalRequestedSpeed = totalRequestedSpeed - this.totalRequestedWork = totalRequestedWork - this.totalAllocatedSpeed = totalAllocatedSpeed - this.totalAllocatedWork = totalAllocatedWork - - interruptAll() - } - - /** - * Flush the progress of the vCPUs. - */ - private fun flushGuests() { - val totalRemainingWork = totalRemainingWork - - // Flush all the outputs work - for (output in outputContexts) { - output.flush(isIntermediate = true) - } - - // Report metrics - listener?.onSliceFinish( - this, - totalRequestedWork.toLong(), - (totalAllocatedWork - totalRemainingWork).toLong(), - totalOvercommittedWork.toLong(), - totalInterferedWork.toLong(), - totalRequestedSpeed, - totalAllocatedSpeed - ) - totalInterferedWork = 0.0 - totalOvercommittedWork = 0.0 - - // Force all inputs to re-schedule their work. - doSchedule() - } - - /** - * Interrupt all inputs. - */ - private fun interruptAll() { - for (input in inputConsumers) { - input.interrupt() + if (!isClosed) { + isClosed = true + distributor.close() + aggregator.close() } } /** * Event listener for hypervisor events. */ - public interface Listener { + public interface Listener { /** * This method is invoked when a slice is finished. */ public fun onSliceFinish( - switch: SimResourceSwitchMaxMin, + switch: SimResourceSwitchMaxMin, requestedWork: Long, grantedWork: Long, overcommittedWork: Long, @@ -292,203 +116,4 @@ public class SimResourceSwitchMaxMin( cpuDemand: Double ) } - - /** - * An internal [SimResourceProvider] implementation for switch outputs. - */ - private inner class OutputProvider(override val resource: R) : SimResourceProvider { - /** - * The [OutputContext] that is currently running. - */ - private var ctx: OutputContext? = null - - override var state: SimResourceState = SimResourceState.Pending - internal set - - override fun startConsumer(consumer: SimResourceConsumer) { - check(state == SimResourceState.Pending) { "Resource cannot be consumed" } - - val ctx = OutputContext(this, resource, consumer) - this.ctx = ctx - this.state = SimResourceState.Active - outputContexts += ctx - - ctx.start() - schedule() - } - - override fun close() { - cancel() - - state = SimResourceState.Stopped - _outputs.remove(this) - } - - override fun interrupt() { - ctx?.interrupt() - } - - override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.stop() - } - - if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending - } - } - } - - /** - * A [SimAbstractResourceContext] for the output resources. - */ - private inner class OutputContext( - private val provider: OutputProvider, - resource: R, - consumer: SimResourceConsumer - ) : SimAbstractResourceContext(resource, clock, consumer), Comparable { - /** - * The current command that is processed by the vCPU. - */ - var activeCommand: SimResourceCommand = SimResourceCommand.Idle() - - /** - * The processing speed that is allowed by the model constraints. - */ - var allowedSpeed: Double = 0.0 - - /** - * The actual processing speed. - */ - var actualSpeed: Double = 0.0 - - private fun reportOvercommit() { - totalOvercommittedWork += remainingWork - } - - override fun onIdle(deadline: Long) { - reportOvercommit() - - allowedSpeed = 0.0 - activeCommand = SimResourceCommand.Idle(deadline) - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - reportOvercommit() - - allowedSpeed = getSpeed(limit) - activeCommand = SimResourceCommand.Consume(work, limit, deadline) - } - - override fun onFinish(cause: Throwable?) { - reportOvercommit() - - activeCommand = SimResourceCommand.Exit - provider.cancel() - - super.onFinish(cause) - } - - override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { - // Apply performance interference model - val performanceScore = 1.0 - - // Compute the remaining amount of work - return if (work > 0.0) { - // Compute the fraction of compute time allocated to the VM - val fraction = actualSpeed / totalAllocatedSpeed - - // Compute the work that was actually granted to the VM. - val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction - val processed = processingAvailable * performanceScore - - val interferedWork = processingAvailable - processed - - totalInterferedWork += interferedWork - - max(0.0, work - processed) - } else { - 0.0 - } - } - - override fun interrupt() { - // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead - // to infinite recursion. - if (isProcessing) { - return - } - - super.interrupt() - - // Force the scheduler to re-schedule - schedule() - } - - override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) - } - - /** - * An internal [SimResourceConsumer] implementation for switch inputs. - */ - private inner class InputConsumer(val input: SimResourceProvider) : SimResourceConsumer { - /** - * The resource context of the consumer. - */ - private lateinit var ctx: SimResourceContext - - /** - * The remaining work of this consumer. - */ - val remainingWork: Double - get() = ctx.remainingWork - - init { - barrier = SimConsumerBarrier(barrier.parties + 1) - input.startConsumer(this@InputConsumer) - } - - /** - * Interrupt the consumer - */ - fun interrupt() { - ctx.interrupt() - } - - override fun onStart(ctx: SimResourceContext) { - this.ctx = ctx - } - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val isLast = barrier.enter() - - // Flush the progress of the guest after the barrier has been reached. - if (isLast && isDirty) { - isDirty = false - flushGuests() - } - - return if (isDirty) { - // Wait for the scheduler determine the work after the barrier has been reached by all CPUs. - SimResourceCommand.Idle() - } else { - // Indicate that the scheduler needs to run next call. - if (isLast) { - isDirty = true - } - - commands[ctx.resource] ?: SimResourceCommand.Idle() - } - } - - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - barrier = SimConsumerBarrier(barrier.parties - 1) - inputConsumers -= this@InputConsumer - _inputs -= input - - super.onFinish(ctx, cause) - } - } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt index 7aa5a5aa..52a42241 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt @@ -42,4 +42,11 @@ public class SimConsumerBarrier(public val parties: Int) { } return false } + + /** + * Reset the barrier. + */ + public fun reset() { + counter = 0 + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt index 0189fe4c..a52d1d5d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources.consumer -import org.opendc.simulator.resources.SimResource import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -31,15 +30,15 @@ import org.opendc.simulator.resources.SimResourceContext * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource * consumption for some period of time. */ -public class SimTraceConsumer(private val trace: Sequence) : SimResourceConsumer { +public class SimTraceConsumer(private val trace: Sequence) : SimResourceConsumer { private var iterator: Iterator? = null - override fun onStart(ctx: SimResourceContext) { + override fun onStart(ctx: SimResourceContext) { check(iterator == null) { "Consumer already running" } iterator = trace.iterator() } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { val iterator = checkNotNull(iterator) return if (iterator.hasNext()) { val now = ctx.clock.millis() @@ -58,7 +57,7 @@ public class SimTraceConsumer(private val trace: Sequence) : SimResour } } - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { iterator = null } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt index 62425583..8f24a020 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources.consumer -import org.opendc.simulator.resources.SimResource import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -30,10 +29,10 @@ import org.opendc.simulator.resources.SimResourceContext /** * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization. */ -public class SimWorkConsumer( +public class SimWorkConsumer( private val work: Double, private val utilization: Double -) : SimResourceConsumer { +) : SimResourceConsumer { init { require(work >= 0.0) { "Work must be positive" } @@ -43,12 +42,12 @@ public class SimWorkConsumer( private var limit = 0.0 private var remainingWork: Double = 0.0 - override fun onStart(ctx: SimResourceContext) { - limit = ctx.resource.capacity * utilization + override fun onStart(ctx: SimResourceContext) { + limit = ctx.capacity * utilization remainingWork = work } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { val work = this.remainingWork + ctx.remainingWork this.remainingWork -= work return if (work > 0.0) { diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt new file mode 100644 index 00000000..3dffc7bf --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runBlockingTest +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler + +/** + * Test suite for the [SimResourceAggregatorMaxMin] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimResourceAggregatorMaxMinTest { + @Test + fun testSingleCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(1.0, 0.5) + val usage = mutableListOf() + val job = launch { sources[0].speed.toList(usage) } + + try { + aggregator.output.consume(consumer) + yield() + + assertAll( + { assertEquals(1000, currentTime) }, + { assertEquals(listOf(0.0, 0.5, 0.0), usage) } + ) + } finally { + aggregator.output.close() + job.cancel() + } + } + + @Test + fun testDoubleCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(2.0, 1.0) + val usage = mutableListOf() + val job = launch { sources[0].speed.toList(usage) } + + try { + aggregator.output.consume(consumer) + yield() + assertAll( + { assertEquals(1000, currentTime) }, + { assertEquals(listOf(0.0, 1.0, 0.0), usage) } + ) + } finally { + aggregator.output.close() + job.cancel() + } + } + + @Test + fun testOvercommit() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = mockk(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) + .andThen(SimResourceCommand.Exit) + + try { + aggregator.output.consume(consumer) + yield() + assertEquals(1000, currentTime) + + verify(exactly = 2) { consumer.onNext(any()) } + } finally { + aggregator.output.close() + } + } + + @Test + fun testException() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = mockk(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) + + try { + assertThrows { aggregator.output.consume(consumer) } + yield() + assertEquals(SimResourceState.Pending, sources[0].state) + } finally { + aggregator.output.close() + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index 5d4eb46d..c6988ed9 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -33,25 +33,18 @@ import org.opendc.simulator.utils.DelayControllerClockAdapter */ @OptIn(ExperimentalCoroutinesApi::class) class SimResourceContextTest { - data class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - @Test fun testFlushWithoutCommand() = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val resource = SimCpu(4200.0) - - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(resource, clock, consumer) { - override fun onIdle(deadline: Long) {} + val context = object : SimAbstractResourceContext(clock, consumer) { + override val capacity: Double = 4200.0 + override fun onIdle(deadline: Long) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish(cause: Throwable?) {} } @@ -61,12 +54,13 @@ class SimResourceContextTest { @Test fun testIntermediateFlush() = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val resource = SimCpu(4200.0) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(resource, clock, consumer) { + val context = spyk(object : SimAbstractResourceContext(clock, consumer) { + override val capacity: Double = 4200.0 + override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -82,12 +76,13 @@ class SimResourceContextTest { @Test fun testIntermediateFlushIdle() = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val resource = SimCpu(4200.0) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(resource, clock, consumer) { + val context = spyk(object : SimAbstractResourceContext(clock, consumer) { + override val capacity: Double = 4200.0 + override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -100,7 +95,7 @@ class SimResourceContextTest { context.flush(isIntermediate = true) assertAll( - { verify(exactly = 1) { context.onIdle(any()) } }, + { verify(exactly = 2) { context.onIdle(any()) } }, { verify(exactly = 1) { context.onFinish(null) } } ) } @@ -108,12 +103,13 @@ class SimResourceContextTest { @Test fun testDoubleStart() = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val resource = SimCpu(4200.0) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(resource, clock, consumer) { + val context = object : SimAbstractResourceContext(clock, consumer) { + override val capacity: Double = 4200.0 + override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt index 47794bdd..f68450ff 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt @@ -40,28 +40,20 @@ import org.opendc.utils.TimerScheduler */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceForwarderTest { - - data class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - @Test fun testExitImmediately() = runBlockingTest { - val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val forwarder = SimResourceForwarder() val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + val source = SimResourceSource(2000.0, clock, scheduler) launch { source.consume(forwarder) source.close() } - forwarder.consume(object : SimResourceConsumer { - override fun onNext( - ctx: SimResourceContext - ): SimResourceCommand { + forwarder.consume(object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { return SimResourceCommand.Exit } }) @@ -72,22 +64,20 @@ internal class SimResourceForwarderTest { @Test fun testExit() = runBlockingTest { - val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val forwarder = SimResourceForwarder() val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + val source = SimResourceSource(2000.0, clock, scheduler) launch { source.consume(forwarder) source.close() } - forwarder.consume(object : SimResourceConsumer { + forwarder.consume(object : SimResourceConsumer { var isFirst = true - override fun onNext( - ctx: SimResourceContext - ): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { return if (isFirst) { isFirst = false SimResourceCommand.Consume(10.0, 1.0) @@ -102,11 +92,9 @@ internal class SimResourceForwarderTest { @Test fun testState() = runBlockingTest { - val forwarder = SimResourceForwarder(SimCpu(1000.0)) - val consumer = object : SimResourceConsumer { - override fun onNext( - ctx: SimResourceContext - ): SimResourceCommand = SimResourceCommand.Exit + val forwarder = SimResourceForwarder() + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit } assertEquals(SimResourceState.Pending, forwarder.state) @@ -125,9 +113,9 @@ internal class SimResourceForwarderTest { @Test fun testCancelPendingDelegate() = runBlockingTest { - val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val forwarder = SimResourceForwarder() - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Exit forwarder.startConsumer(consumer) @@ -138,12 +126,12 @@ internal class SimResourceForwarderTest { @Test fun testCancelStartedDelegate() = runBlockingTest { - val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val forwarder = SimResourceForwarder() val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + val source = SimResourceSource(2000.0, clock, scheduler) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) source.startConsumer(forwarder) @@ -158,12 +146,12 @@ internal class SimResourceForwarderTest { @Test fun testCancelPropagation() = runBlockingTest { - val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val forwarder = SimResourceForwarder() val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + val source = SimResourceSource(2000.0, clock, scheduler) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) source.startConsumer(forwarder) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index c0ed8c9e..1279c679 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -37,20 +37,16 @@ import org.opendc.utils.TimerScheduler */ @OptIn(ExperimentalCoroutinesApi::class) class SimResourceSourceTest { - data class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - @Test fun testSpeed() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1000 * provider.resource.speed, provider.resource.speed)) + .returns(SimResourceCommand.Consume(1000 * capacity, capacity)) .andThen(SimResourceCommand.Exit) try { @@ -60,7 +56,7 @@ class SimResourceSourceTest { provider.consume(consumer) job.cancel() - assertEquals(listOf(0.0, provider.resource.speed, 0.0), res) { "Speed is reported correctly" } + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { scheduler.close() provider.close() @@ -71,11 +67,12 @@ class SimResourceSourceTest { fun testSpeedLimit() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1000 * provider.resource.speed, 2 * provider.resource.speed)) + .returns(SimResourceCommand.Consume(1000 * capacity, 2 * capacity)) .andThen(SimResourceCommand.Exit) try { @@ -85,7 +82,7 @@ class SimResourceSourceTest { provider.consume(consumer) job.cancel() - assertEquals(listOf(0.0, provider.resource.speed, 0.0), res) { "Speed is reported correctly" } + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { scheduler.close() provider.close() @@ -100,14 +97,15 @@ class SimResourceSourceTest { fun testIntermediateInterrupt() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext) { + val consumer = object : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext) { ctx.interrupt() } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { return SimResourceCommand.Exit } } @@ -124,16 +122,17 @@ class SimResourceSourceTest { fun testInterrupt() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - lateinit var resCtx: SimResourceContext + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + lateinit var resCtx: SimResourceContext - val consumer = object : SimResourceConsumer { + val consumer = object : SimResourceConsumer { var isFirst = true - override fun onStart(ctx: SimResourceContext) { + override fun onStart(ctx: SimResourceContext) { resCtx = ctx } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { assertEquals(0.0, ctx.remainingWork) return if (isFirst) { isFirst = false @@ -162,9 +161,10 @@ class SimResourceSourceTest { fun testFailure() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onStart(any()) } .throws(IllegalStateException()) @@ -182,9 +182,10 @@ class SimResourceSourceTest { fun testExceptionPropagationOnNext() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) @@ -203,9 +204,10 @@ class SimResourceSourceTest { fun testConcurrentConsumption() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) @@ -227,9 +229,10 @@ class SimResourceSourceTest { fun testClosedConsumption() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) @@ -249,9 +252,10 @@ class SimResourceSourceTest { fun testCloseDuringConsumption() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) @@ -272,9 +276,10 @@ class SimResourceSourceTest { fun testIdle() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Idle(clock.millis() + 500)) .andThen(SimResourceCommand.Exit) @@ -295,9 +300,10 @@ class SimResourceSourceTest { runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Idle()) .andThenThrows(IllegalStateException()) @@ -316,9 +322,10 @@ class SimResourceSourceTest { fun testIncorrectDeadline() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Idle(2)) .andThen(SimResourceCommand.Exit) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index dc90a43e..edd60502 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -42,11 +42,6 @@ import org.opendc.utils.TimerScheduler */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceSwitchExclusiveTest { - class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - /** * Test a trace workload. */ @@ -68,12 +63,12 @@ internal class SimResourceSwitchExclusiveTest { ), ) - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) switch.addInput(source) - val provider = switch.addOutput(SimCpu(3200.0)) + val provider = switch.addOutput(3200.0) val job = launch { source.speed.toList(speed) } try { @@ -99,15 +94,15 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = TimerScheduler(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = mockk>(relaxUnitFun = true) + val workload = mockk(relaxUnitFun = true) every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) switch.addInput(source) - val provider = switch.addOutput(SimCpu(3200.0)) + val provider = switch.addOutput(3200.0) try { provider.consume(workload) @@ -127,16 +122,14 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = TimerScheduler(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = object : SimResourceConsumer { + val workload = object : SimResourceConsumer { var isFirst = true - override fun onStart(ctx: SimResourceContext) { + override fun onStart(ctx: SimResourceContext) { isFirst = true } - override fun onNext( - ctx: SimResourceContext - ): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { return if (isFirst) { isFirst = false SimResourceCommand.Consume(duration / 1000.0, 1.0) @@ -146,12 +139,12 @@ internal class SimResourceSwitchExclusiveTest { } } - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) switch.addInput(source) - val provider = switch.addOutput(SimCpu(3200.0)) + val provider = switch.addOutput(3200.0) try { provider.consume(workload) @@ -172,15 +165,15 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = TimerScheduler(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = mockk>(relaxUnitFun = true) + val workload = mockk(relaxUnitFun = true) every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) switch.addInput(source) - switch.addOutput(SimCpu(3200.0)) - assertThrows { switch.addOutput(SimCpu(3200.0)) } + switch.addOutput(3200.0) + assertThrows { switch.addOutput(3200.0) } } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index 8b989334..5f4fd187 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -40,23 +40,18 @@ import org.opendc.utils.TimerScheduler */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceSwitchMaxMinTest { - class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - @Test fun testSmoke() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val switch = SimResourceSwitchMaxMin(clock) + val switch = SimResourceSwitchMaxMin(clock) - val sources = List(2) { SimResourceSource(SimCpu(2000.0), clock, scheduler) } + val sources = List(2) { SimResourceSource(2000.0, clock, scheduler) } sources.forEach { switch.addInput(it) } - val provider = switch.addOutput(SimCpu(1000.0)) + val provider = switch.addOutput(1000.0) - val consumer = mockk>(relaxUnitFun = true) + val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit try { @@ -76,13 +71,13 @@ internal class SimResourceSwitchMaxMinTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val listener = object : SimResourceSwitchMaxMin.Listener { + val listener = object : SimResourceSwitchMaxMin.Listener { var totalRequestedWork = 0L var totalGrantedWork = 0L var totalOvercommittedWork = 0L override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, + switch: SimResourceSwitchMaxMin, requestedWork: Long, grantedWork: Long, overcommittedWork: Long, @@ -108,10 +103,10 @@ internal class SimResourceSwitchMaxMinTest { ) val switch = SimResourceSwitchMaxMin(clock, listener) - val provider = switch.addOutput(SimCpu(3200.0)) + val provider = switch.addOutput(3200.0) try { - switch.addInput(SimResourceSource(SimCpu(3200.0), clock, scheduler)) + switch.addInput(SimResourceSource(3200.0, clock, scheduler)) provider.consume(workload) yield() } finally { @@ -135,13 +130,13 @@ internal class SimResourceSwitchMaxMinTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val listener = object : SimResourceSwitchMaxMin.Listener { + val listener = object : SimResourceSwitchMaxMin.Listener { var totalRequestedWork = 0L var totalGrantedWork = 0L var totalOvercommittedWork = 0L override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, + switch: SimResourceSwitchMaxMin, requestedWork: Long, grantedWork: Long, overcommittedWork: Long, @@ -176,11 +171,11 @@ internal class SimResourceSwitchMaxMinTest { ) val switch = SimResourceSwitchMaxMin(clock, listener) - val providerA = switch.addOutput(SimCpu(3200.0)) - val providerB = switch.addOutput(SimCpu(3200.0)) + val providerA = switch.addOutput(3200.0) + val providerB = switch.addOutput(3200.0) try { - switch.addInput(SimResourceSource(SimCpu(3200.0), clock, scheduler)) + switch.addInput(SimResourceSource(3200.0, clock, scheduler)) coroutineScope { launch { providerA.consume(workloadA) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt index b05195f7..4d6b19ee 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -35,18 +35,13 @@ import org.opendc.utils.TimerScheduler */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimWorkConsumerTest { - data class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - @Test fun testSmoke() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(1.0), clock, scheduler) + val provider = SimResourceSource(1.0, clock, scheduler) - val consumer = SimWorkConsumer(1.0, 1.0) + val consumer = SimWorkConsumer(1.0, 1.0) try { provider.consume(consumer) @@ -60,9 +55,9 @@ internal class SimWorkConsumerTest { fun testUtilization() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(1.0), clock, scheduler) + val provider = SimResourceSource(1.0, clock, scheduler) - val consumer = SimWorkConsumer(1.0, 0.5) + val consumer = SimWorkConsumer(1.0, 0.5) try { provider.consume(consumer) -- cgit v1.2.3 From b5ac4b4f0c9a9e0c4b2ee744f8184adbe8e8d76a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 22 Mar 2021 21:00:41 +0100 Subject: simulator: Add support for signaling dynamic capacity changes This change adds support for dynamically changing the capacity of resources and propagating this change to consumers. --- .../resources/SimAbstractResourceAggregator.kt | 19 +++--- .../resources/SimAbstractResourceContext.kt | 71 +++++++++++++++------- .../simulator/resources/SimResourceCommand.kt | 2 +- .../simulator/resources/SimResourceConsumer.kt | 13 ++++ .../resources/SimResourceDistributorMaxMin.kt | 7 +-- .../simulator/resources/SimResourceForwarder.kt | 4 ++ .../simulator/resources/SimResourceSource.kt | 39 +++++++----- .../resources/SimResourceSwitchExclusive.kt | 2 + .../resources/consumer/SimWorkConsumer.kt | 17 +++--- .../resources/SimResourceAggregatorMaxMinTest.kt | 56 ++++++++++++++++- .../simulator/resources/SimResourceContextTest.kt | 16 ++--- .../resources/SimResourceForwarderTest.kt | 26 +++++++- .../simulator/resources/SimResourceSourceTest.kt | 25 ++++++++ 13 files changed, 217 insertions(+), 80 deletions(-) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 18ac0cd8..e5991264 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -100,10 +100,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : get() = _inputs private val _inputs = mutableSetOf() - private val context = object : SimAbstractResourceContext(clock, _output) { - override val capacity: Double - get() = inputContexts.sumByDouble { it.capacity } - + private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) { override val remainingWork: Double get() = inputContexts.sumByDouble { it.remainingWork } @@ -113,13 +110,9 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : interruptAll() } - override fun onConsume(work: Double, limit: Double, deadline: Long) { - doConsume(work, limit, deadline) - } + override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline) - override fun onIdle(deadline: Long) { - doIdle(deadline) - } + override fun onIdle(deadline: Long) = doIdle(deadline) override fun onFinish(cause: Throwable?) { doFinish(cause) @@ -176,6 +169,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : private inner class Consumer : SimResourceConsumer { override fun onStart(ctx: SimResourceContext) { onContextStarted(ctx) + onCapacityChanged(ctx, false) // Make sure we initialize the output if we have not done so yet if (context.state == SimResourceState.Pending) { @@ -189,6 +183,11 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : return commands[ctx] ?: SimResourceCommand.Idle() } + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + // Adjust capacity of output resource + context.capacity = inputContexts.sumByDouble { it.capacity } + } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { onContextFinished(ctx) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index f65cbaf4..9705bd17 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.resources import java.time.Clock -import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -31,9 +30,24 @@ import kotlin.math.min * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. */ public abstract class SimAbstractResourceContext( + initialCapacity: Double, override val clock: Clock, private val consumer: SimResourceConsumer ) : SimResourceContext { + /** + * The capacity of the resource. + */ + public final override var capacity: Double = initialCapacity + set(value) { + val oldValue = field + + // Only changes will be propagated + if (value != oldValue) { + field = value + onCapacityChange() + } + } + /** * The amount of work still remaining at this instant. */ @@ -49,6 +63,12 @@ public abstract class SimAbstractResourceContext( public var state: SimResourceState = SimResourceState.Pending private set + /** + * The current processing speed of the resource. + */ + public var speed: Double = 0.0 + private set + /** * This method is invoked when the resource will idle until the specified [deadline]. */ @@ -67,20 +87,6 @@ public abstract class SimAbstractResourceContext( consumer.onFinish(this, cause) } - /** - * Compute the duration that a resource consumption will take with the specified [speed]. - */ - protected open fun getDuration(work: Double, speed: Double): Long { - return ceil(work / speed * 1000).toLong() - } - - /** - * Compute the speed at which the resource may be consumed. - */ - protected open fun getSpeed(limit: Double): Double { - return min(limit, capacity) - } - /** * Get the remaining work to process after a resource consumption. * @@ -183,8 +189,8 @@ public abstract class SimAbstractResourceContext( is SimResourceCommand.Consume -> { // We should only continue processing the next command if: // 1. The resource consumption was finished. - // 2. The resource consumer reached its deadline. - // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) + // 2. The resource capacity cannot satisfy the demand. + // 4. The resource consumer should be interrupted (e.g., someone called .interrupt()) if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { next(now) } else { @@ -253,6 +259,8 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } + speed = 0.0 + onIdle(deadline) } is SimResourceCommand.Consume -> { @@ -262,10 +270,15 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } + speed = min(capacity, limit) + onConsume(work, limit, deadline) } is SimResourceCommand.Exit -> { + speed = 0.0 + doStop(null) + // No need to set the next active command return null } @@ -286,14 +299,30 @@ public abstract class SimAbstractResourceContext( val (timestamp, command) = wrapper val duration = now - timestamp return when (command) { - is SimResourceCommand.Consume -> { - val speed = getSpeed(command.limit) - getRemainingWork(command.work, speed, duration) - } + is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration) is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0 } } + /** + * Indicate that the capacity of the resource has changed. + */ + private fun onCapacityChange() { + // Do not inform the consumer if it has not been started yet + if (state != SimResourceState.Active) { + return + } + + val isThrottled = speed > capacity + consumer.onCapacityChanged(this, isThrottled) + + // Optimization: only flush changes if the new capacity cannot satisfy the active resource command. + // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush(). + if (isThrottled) { + flush(isIntermediate = true) + } + } + /** * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. */ diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt index 21f56f9b..f7f3fa4d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.resources /** - * A SimResourceCommand communicates to a [SimResource] how it is consumed by a [SimResourceConsumer]. + * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer]. */ public sealed class SimResourceCommand { /** diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 04c7fcaf..672a3e9d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -45,6 +45,19 @@ public interface SimResourceConsumer { */ public fun onNext(ctx: SimResourceContext): SimResourceCommand + /** + * This is method is invoked when the capacity of the resource changes. + * + * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the + * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly + * causing the active resource command to finish at a later moment than initially planned. + * + * @param ctx The execution context in which the consumer runs. + * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the + * capacity change. + */ + public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {} + /** * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], * the resource finished itself, or a failure occurred at the resource. diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index b0f27b9d..9df333e3 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -335,10 +335,7 @@ public class SimResourceDistributorMaxMin( private inner class OutputContext( private val provider: OutputProvider, consumer: SimResourceConsumer - ) : SimAbstractResourceContext(clock, consumer), Comparable { - override val capacity: Double - get() = provider.capacity - + ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable { /** * The current command that is processed by the vCPU. */ @@ -369,7 +366,7 @@ public class SimResourceDistributorMaxMin( override fun onConsume(work: Double, limit: Double, deadline: Long) { reportOvercommit() - allowedSpeed = getSpeed(limit) + allowedSpeed = speed activeCommand = SimResourceCommand.Consume(work, limit, deadline) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt index 227f4d62..1a05accd 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -118,6 +118,10 @@ public class SimResourceForwarder : SimResourceProvider, SimResourceConsumer { } } + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + delegate?.onCapacityChanged(ctx, isThrottled) + } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { this.ctx = null diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 3b4e7e7a..9b10edaf 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import org.opendc.utils.TimerScheduler import java.time.Clock +import kotlin.math.ceil import kotlin.math.min /** @@ -36,7 +37,7 @@ import kotlin.math.min * @param scheduler The scheduler to schedule the interrupts. */ public class SimResourceSource( - private val initialCapacity: Double, + initialCapacity: Double, private val clock: Clock, private val scheduler: TimerScheduler ) : SimResourceProvider { @@ -47,6 +48,15 @@ public class SimResourceSource( get() = _speed private val _speed = MutableStateFlow(0.0) + /** + * The capacity of the resource. + */ + public var capacity: Double = initialCapacity + set(value) { + field = value + ctx?.capacity = value + } + /** * The [Context] that is currently running. */ @@ -89,20 +99,9 @@ public class SimResourceSource( /** * Internal implementation of [SimResourceContext] for this class. */ - private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = initialCapacity - - /** - * The processing speed of the resource. - */ - private var speed: Double = 0.0 - set(value) { - field = value - _speed.value = field - } - + private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) { override fun onIdle(deadline: Long) { - speed = 0.0 + _speed.value = speed // Do not resume if deadline is "infinite" if (deadline != Long.MAX_VALUE) { @@ -111,14 +110,15 @@ public class SimResourceSource( } override fun onConsume(work: Double, limit: Double, deadline: Long) { - speed = getSpeed(limit) + _speed.value = speed + val until = min(deadline, clock.millis() + getDuration(work, speed)) scheduler.startSingleTimerTo(this, until, ::flush) } override fun onFinish(cause: Throwable?) { - speed = 0.0 + _speed.value = speed scheduler.cancel(this) cancel() @@ -127,4 +127,11 @@ public class SimResourceSource( override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" } + + /** + * Compute the duration that a resource consumption will take with the specified [speed]. + */ + private fun getDuration(work: Double, speed: Double): Long { + return ceil(work / speed * 1000).toLong() + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 6e431ea1..a10f84b6 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -86,6 +86,8 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { private val forwarder: SimResourceForwarder ) : SimResourceProvider by forwarder { override fun close() { + // We explicitly do not close the forwarder here in order to re-use it across output resources. + _outputs -= this availableResources += forwarder } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt index 8f24a020..faa693c4 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -39,17 +39,16 @@ public class SimWorkConsumer( require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - private var limit = 0.0 - private var remainingWork: Double = 0.0 - - override fun onStart(ctx: SimResourceContext) { - limit = ctx.capacity * utilization - remainingWork = work - } + private var isFirst = true override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val work = this.remainingWork + ctx.remainingWork - this.remainingWork -= work + val limit = ctx.capacity * utilization + val work = if (isFirst) { + isFirst = false + work + } else { + ctx.remainingWork + } return if (work > 0.0) { SimResourceCommand.Consume(work, limit) } else { diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index 3dffc7bf..de864c1c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -25,11 +25,9 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.launch import kotlinx.coroutines.test.runBlockingTest -import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll @@ -155,4 +153,56 @@ internal class SimResourceAggregatorMaxMinTest { aggregator.output.close() } } + + @Test + fun testAdjustCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(4.0, 1.0) + try { + coroutineScope { + launch { aggregator.output.consume(consumer) } + delay(1000) + sources[0].capacity = 0.5 + } + yield() + assertEquals(2334, currentTime) + } finally { + aggregator.output.close() + } + } + + @Test + fun testFailOverCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(1.0, 0.5) + try { + coroutineScope { + launch { aggregator.output.consume(consumer) } + delay(500) + sources[0].capacity = 0.5 + } + yield() + assertEquals(1000, currentTime) + } finally { + aggregator.output.close() + } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index c6988ed9..030a0f6b 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -40,9 +40,7 @@ class SimResourceContextTest { val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = 4200.0 - + val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} override fun onFinish(cause: Throwable?) {} @@ -58,9 +56,7 @@ class SimResourceContextTest { val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = 4200.0 - + val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -80,9 +76,7 @@ class SimResourceContextTest { val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = 4200.0 - + val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -107,9 +101,7 @@ class SimResourceContextTest { val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = 4200.0 - + val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt index f68450ff..143dbca9 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt @@ -24,14 +24,14 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk +import io.mockk.spyk import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.launch +import kotlinx.coroutines.* import kotlinx.coroutines.test.runBlockingTest -import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -163,4 +163,24 @@ internal class SimResourceForwarderTest { verify(exactly = 1) { consumer.onStart(any()) } verify(exactly = 1) { consumer.onFinish(any(), null) } } + + @Test + fun testAdjustCapacity() = runBlockingTest { + val forwarder = SimResourceForwarder() + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler(coroutineContext, clock) + val source = SimResourceSource(1.0, clock, scheduler) + + val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + source.startConsumer(forwarder) + + coroutineScope { + launch { forwarder.consume(consumer) } + delay(1000) + source.capacity = 0.5 + } + + assertEquals(3000, currentTime) + verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 1279c679..58e19421 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -24,11 +24,14 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -63,6 +66,28 @@ class SimResourceSourceTest { } } + @Test + fun testAdjustCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler(coroutineContext, clock) + val provider = SimResourceSource(1.0, clock, scheduler) + + val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + + try { + coroutineScope { + launch { provider.consume(consumer) } + delay(1000) + provider.capacity = 0.5 + } + assertEquals(3000, currentTime) + verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } + } finally { + scheduler.close() + provider.close() + } + } + @Test fun testSpeedLimit() = runBlockingTest { val clock = DelayControllerClockAdapter(this) -- cgit v1.2.3 From 38a13e5c201c828f9f21f17e89916b4638396945 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 23 Mar 2021 11:38:25 +0100 Subject: simulator: Add support for emitting VM usage metrics This change re-enables support for VM usage metrics by adding an adapter for SimResourceConsumer instances that can export the consumer speed. --- .../opendc/simulator/compute/SimAbstractMachine.kt | 5 +- .../opendc/simulator/compute/SimHypervisorTest.kt | 11 +++- .../org/opendc/simulator/compute/SimMachineTest.kt | 2 +- .../resources/consumer/SimSpeedConsumerAdapter.kt | 68 ++++++++++++++++++++++ 4 files changed, 80 insertions(+), 6 deletions(-) create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 52945354..1c0f94fd 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -102,7 +102,7 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine val consumer = workload.getConsumer(ctx, cpu) val job = source.speed .onEach { - _speed[cpu.id] = source.speed.value + _speed[cpu.id] = it _usage.value = _speed.sum() / totalCapacity } .launchIn(this) @@ -116,9 +116,8 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine override fun close() { if (!isTerminated) { - resources.forEach { (_, provider) -> provider.close() } - } else { isTerminated = true + resources.forEach { (_, provider) -> provider.close() } } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index 0149024f..5773b325 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -24,6 +24,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.yield @@ -98,15 +99,21 @@ internal class SimHypervisorTest { println("Hypervisor finished") } yield() - hypervisor.createMachine(model).run(workloadA) + val vm = hypervisor.createMachine(model) + val res = mutableListOf() + val job = launch { machine.usage.toList(res) } + + vm.run(workloadA) yield() + job.cancel() machine.close() assertAll( { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") }, { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, - { assertEquals(1200000, currentTime) } + { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), res) { "VM usage is correct" } }, + { assertEquals(1200000, currentTime) { "Current time is correct" } } ) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 7e014245..071bdf77 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -76,7 +76,7 @@ class SimMachineTest { try { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) - + yield() job.cancel() assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" } } finally { diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt new file mode 100644 index 00000000..fd4a9ed5 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.consumer + +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext +import kotlin.math.min + +/** + * Helper class to expose an observable [speed] field describing the speed of the consumer. + */ +public class SimSpeedConsumerAdapter(private val delegate: SimResourceConsumer) : SimResourceConsumer by delegate { + /** + * The resource processing speed over time. + */ + public val speed: StateFlow + get() = _speed + private val _speed = MutableStateFlow(0.0) + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + val command = delegate.onNext(ctx) + + when (command) { + is SimResourceCommand.Idle -> _speed.value = 0.0 + is SimResourceCommand.Consume -> _speed.value = min(ctx.capacity, command.limit) + is SimResourceCommand.Exit -> _speed.value = 0.0 + } + + return command + } + + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + val oldSpeed = _speed.value + + delegate.onCapacityChanged(ctx, isThrottled) + + // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might + // need to update the current speed. + if (oldSpeed == _speed.value) { + _speed.value = min(ctx.capacity, _speed.value) + } + } + + override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]" +} -- cgit v1.2.3