summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt8
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt3
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt3
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt3
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt27
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt23
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt6
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt74
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt31
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt138
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt40
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt72
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt43
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt41
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt129
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt16
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt60
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt115
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt112
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt159
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt52
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt19
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt74
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt2
27 files changed, 733 insertions, 525 deletions
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 6929b06c..c4bd0cb4 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -136,8 +136,8 @@ internal class SimHostTest {
assertAll(
{ assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
- { assertEquals(4273200, requestedWork, "Requested work does not match") },
- { assertEquals(3133200, grantedWork, "Granted work does not match") },
+ { assertEquals(4281600, requestedWork, "Requested work does not match") },
+ { assertEquals(3141600, grantedWork, "Granted work does not match") },
{ assertEquals(1140000, overcommittedWork, "Overcommitted work does not match") },
{ assertEquals(1200006, scope.currentTime) }
)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 59ce895f..7dae53be 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -143,8 +143,8 @@ class CapelinIntegrationTest {
assertAll(
{ assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
{ assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
- { assertEquals(1707132711051, monitor.totalRequestedBurst) },
- { assertEquals(457881474296, monitor.totalGrantedBurst) },
+ { assertEquals(1707144601723, monitor.totalRequestedBurst) },
+ { assertEquals(457893364971, monitor.totalGrantedBurst) },
{ assertEquals(1220323969993, monitor.totalOvercommissionedBurst) },
{ assertEquals(0, monitor.totalInterferedBurst) }
)
@@ -189,8 +189,8 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(711464322955, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(175226276978, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(711464339925, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(175226293948, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
{ assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
index a99b082a..281d43ae 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
@@ -32,7 +32,6 @@ import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.*
import java.time.Clock
-import kotlin.coroutines.CoroutineContext
/**
* Abstract implementation of the [SimHypervisor] interface.
@@ -121,7 +120,7 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
override val clock: Clock
get() = this@SimAbstractHypervisor.context.clock
- override val meta: Map<String, Any> = meta + mapOf("coroutine-context" to context.meta["coroutine-context"] as CoroutineContext)
+ override val meta: Map<String, Any> = meta
override fun interrupt(resource: SimResource) {
requireNotNull(this@VirtualMachine.cpus[resource]).interrupt()
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 39ae34fe..44906c2b 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -33,6 +33,7 @@ import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.SimResource
import org.opendc.simulator.resources.SimResourceProvider
import org.opendc.simulator.resources.SimResourceSource
+import org.opendc.simulator.resources.consume
import java.time.Clock
import kotlin.coroutines.CoroutineContext
@@ -91,7 +92,7 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = withContext(context) {
val resources = resources
require(!isTerminated) { "Machine is terminated" }
- val ctx = Context(resources, meta + mapOf("coroutine-context" to context))
+ val ctx = Context(resources, meta)
val totalCapacity = model.cpus.sumByDouble { it.frequency }
_speed = MutableList(model.cpus.size) { 0.0 }
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index bb97192d..c629fbd9 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -25,7 +25,6 @@ package org.opendc.simulator.compute
import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.*
-import kotlin.coroutines.CoroutineContext
/**
* A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single
@@ -40,7 +39,6 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> {
return SimResourceSwitchMaxMin(
ctx.clock,
- ctx.meta["coroutine-context"] as CoroutineContext,
object : SimResourceSwitchMaxMin.Listener<SimProcessingUnit> {
override fun onSliceFinish(
switch: SimResourceSwitchMaxMin<SimProcessingUnit>,
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
index 2001a230..5de69884 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
@@ -24,7 +24,6 @@ package org.opendc.simulator.compute
import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.resources.*
-import kotlin.coroutines.CoroutineContext
/**
* A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
@@ -35,6 +34,6 @@ public class SimSpaceSharedHypervisor : SimAbstractHypervisor() {
}
override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> {
- return SimResourceSwitchExclusive(ctx.meta["coroutine-context"] as CoroutineContext)
+ return SimResourceSwitchExclusive()
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index 9b47821e..f1079ee6 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -24,9 +24,8 @@ package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.model.SimProcessingUnit
-import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
* A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on
@@ -47,29 +46,7 @@ public class SimFlopsWorkload(
override fun onStart(ctx: SimMachineContext) {}
override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
- return CpuConsumer(ctx)
- }
-
- private inner class CpuConsumer(private val machine: SimMachineContext) : SimResourceConsumer<SimProcessingUnit> {
- override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
- val limit = ctx.resource.frequency * utilization
- val work = flops.toDouble() / machine.cpus.size
-
- return if (work > 0.0) {
- SimResourceCommand.Consume(work, limit)
- } else {
- SimResourceCommand.Exit
- }
- }
-
- override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
- return if (remainingWork > 0.0) {
- val limit = ctx.resource.frequency * utilization
- return SimResourceCommand.Consume(remainingWork, limit)
- } else {
- SimResourceCommand.Exit
- }
- }
+ return SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization)
}
override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)"
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
index 313b6ed5..d7aa8f80 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
@@ -24,9 +24,8 @@ package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.model.SimProcessingUnit
-import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
* A [SimWorkload] that models application execution as a single duration.
@@ -46,24 +45,8 @@ public class SimRuntimeWorkload(
override fun onStart(ctx: SimMachineContext) {}
override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
- return CpuConsumer()
- }
-
- private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> {
- override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
- val limit = ctx.resource.frequency * utilization
- val work = (limit / 1000) * duration
- return SimResourceCommand.Consume(work, limit)
- }
-
- override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
- return if (remainingWork > 0.0) {
- val limit = ctx.resource.frequency * utilization
- SimResourceCommand.Consume(remainingWork, limit)
- } else {
- SimResourceCommand.Exit
- }
- }
+ val limit = cpu.frequency * utilization
+ return SimWorkConsumer((limit / 1000) * duration, utilization)
}
override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)"
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 31f58a0f..cc4f3136 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -50,11 +50,7 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
}
private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> {
- override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
- return onNext(ctx, 0.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, capacity: Double, remainingWork: Double): SimResourceCommand {
val now = ctx.clock.millis()
val fragment = fragment ?: return SimResourceCommand.Exit
val work = (fragment.duration / 1000) * fragment.usage
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index 52251bff..431ca625 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -49,12 +49,9 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
/**
* This method is invoked when the resource consumer has finished.
*/
- public abstract fun onFinish()
-
- /**
- * This method is invoked when the resource consumer throws an exception.
- */
- public abstract fun onFailure(cause: Throwable)
+ public open fun onFinish(cause: Throwable?) {
+ consumer.onFinish(this, cause)
+ }
/**
* Compute the duration that a resource consumption will take with the specified [speed].
@@ -67,7 +64,14 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
* Compute the speed at which the resource may be consumed.
*/
protected open fun getSpeed(limit: Double): Double {
- return min(limit, resource.capacity)
+ return min(limit, getCapacity())
+ }
+
+ /**
+ * Return the capacity available for the resource consumer.
+ */
+ protected open fun getCapacity(): Double {
+ return resource.capacity
}
/**
@@ -93,13 +97,17 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
* Start the consumer.
*/
public fun start() {
- try {
- isProcessing = true
- latestFlush = clock.millis()
+ check(state == SimResourceState.Pending) { "Consumer is already started" }
+
+ state = SimResourceState.Active
+ isProcessing = true
+ latestFlush = clock.millis()
- interpret(consumer.onStart(this))
- } catch (e: Throwable) {
- onFailure(e)
+ try {
+ consumer.onStart(this)
+ interpret(consumer.onNext(this, getCapacity(), 0.0))
+ } catch (cause: Throwable) {
+ doStop(cause)
} finally {
isProcessing = false
}
@@ -114,9 +122,9 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
latestFlush = clock.millis()
flush(isIntermediate = true)
- onFinish()
- } catch (e: Throwable) {
- onFailure(e)
+ doStop(null)
+ } catch (cause: Throwable) {
+ doStop(cause)
} finally {
isProcessing = false
}
@@ -129,7 +137,12 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
* flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
* will be asked to deliver a new command and is essentially interrupted.
*/
- public open fun flush(isIntermediate: Boolean = false) {
+ public fun flush(isIntermediate: Boolean = false) {
+ // Flush is no-op when the consumer is finished or not yet started
+ if (state != SimResourceState.Active) {
+ return
+ }
+
val now = clock.millis()
// Fast path: if the intermediate progress was already flushed at the current instant, we can skip it.
@@ -177,8 +190,8 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
// Flush may not be called when the resource consumer has finished
throw IllegalStateException()
}
- } catch (e: Throwable) {
- onFailure(e)
+ } catch (cause: Throwable) {
+ doStop(cause)
} finally {
latestFlush = now
isProcessing = false
@@ -203,6 +216,11 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
protected var isProcessing: Boolean = false
/**
+ * A flag to indicate the state of the context.
+ */
+ private var state: SimResourceState = SimResourceState.Pending
+
+ /**
* The current command that is being processed.
*/
private var activeCommand: CommandWrapper? = null
@@ -213,6 +231,18 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
private var latestFlush: Long = Long.MIN_VALUE
/**
+ * Finish the consumer and resource provider.
+ */
+ private fun doStop(cause: Throwable?) {
+ val state = state
+ this.state = SimResourceState.Stopped
+
+ if (state == SimResourceState.Active) {
+ onFinish(cause)
+ }
+ }
+
+ /**
* Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
*/
private fun interpret(command: SimResourceCommand) {
@@ -235,9 +265,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
onConsume(work, limit, deadline)
}
- is SimResourceCommand.Exit -> {
- onFinish()
- }
+ is SimResourceCommand.Exit -> doStop(null)
}
assert(activeCommand == null) { "Concurrent access to current command" }
@@ -248,7 +276,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
* Request the workload for more work.
*/
private fun next(remainingWork: Double) {
- interpret(consumer.onNext(this, remainingWork))
+ interpret(consumer.onNext(this, getCapacity(), remainingWork))
}
/**
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
index 77c0a7a9..21f56f9b 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
@@ -29,7 +29,7 @@ public sealed class SimResourceCommand {
/**
* A request to the resource to perform the specified amount of work before the given [deadline].
*
- * @param work The amount of work to process on the CPU.
+ * @param work The amount of work to process.
* @param limit The maximum amount of work to be processed per second.
* @param deadline The instant at which the work needs to be fulfilled.
*/
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
index f516faa6..7637ee69 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -23,23 +23,38 @@
package org.opendc.simulator.resources
/**
- * A SimResourceConsumer characterizes how a [SimResource] is consumed.
+ * A [SimResourceConsumer] characterizes how a [SimResource] is consumed.
+ *
+ * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently)
+ * for multiple resource providers, unless explicitly said otherwise.
*/
public interface SimResourceConsumer<in R : SimResource> {
/**
- * This method is invoked when the consumer is started for a resource.
+ * This method is invoked when the consumer is started for some resource.
*
* @param ctx The execution context in which the consumer runs.
- * @return The next command that the resource should perform.
*/
- public fun onStart(ctx: SimResourceContext<R>): SimResourceCommand
+ public fun onStart(ctx: SimResourceContext<R>) {}
/**
- * This method is invoked when a resource was either interrupted or reached its deadline.
+ * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because
+ * the resource finished processing, reached its deadline or was interrupted.
*
* @param ctx The execution context in which the consumer runs.
- * @param remainingWork The remaining work that was not yet completed.
- * @return The next command that the resource should perform.
+ * @param capacity The capacity that is available for the consumer. In case no capacity is available, zero is given.
+ * @param remainingWork The work of the previous command that was not yet completed due to interruption or deadline.
+ * @return The next command that the resource should execute.
+ */
+ public fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand
+
+ /**
+ * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit],
+ * the resource finished itself, or a failure occurred at the resource.
+ *
+ * Note that throwing an exception in [onStart] or [onNext] is undefined behavior and up to the resource provider.
+ *
+ * @param ctx The execution context in which the consumer ran.
+ * @param cause The cause of the finish in case the resource finished exceptionally.
*/
- public fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand
+ public fun onFinish(ctx: SimResourceContext<R>, cause: Throwable? = null) {}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
index ca23557c..23bf8739 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
@@ -22,10 +22,6 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-
/**
* A helper class to construct a [SimResourceProvider] which forwards the requests to a [SimResourceConsumer].
*/
@@ -37,16 +33,6 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) :
private var ctx: SimResourceContext<R>? = null
/**
- * A flag to indicate that the forwarder is closed.
- */
- private var isClosed: Boolean = false
-
- /**
- * The continuation to resume after consumption.
- */
- private var cont: Continuation<Unit>? = null
-
- /**
* The delegate [SimResourceConsumer].
*/
private var delegate: SimResourceConsumer<R>? = null
@@ -61,95 +47,111 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) :
*/
private var remainingWork: Double = 0.0
- override suspend fun consume(consumer: SimResourceConsumer<R>) {
- check(!isClosed) { "Lifecycle of forwarder has ended" }
- check(cont == null) { "Run should not be called concurrently" }
-
- return suspendCancellableCoroutine { cont ->
- this.cont = cont
- this.delegate = consumer
+ /**
+ * The state of the forwarder.
+ */
+ override var state: SimResourceState = SimResourceState.Pending
+ private set
- cont.invokeOnCancellation { reset() }
+ override fun startConsumer(consumer: SimResourceConsumer<R>) {
+ check(state == SimResourceState.Pending) { "Resource is in invalid state" }
- ctx?.interrupt()
- }
+ state = SimResourceState.Active
+ delegate = consumer
+ interrupt()
}
override fun interrupt() {
ctx?.interrupt()
}
+ override fun cancel() {
+ val delegate = delegate
+ val ctx = ctx
+
+ state = SimResourceState.Pending
+
+ if (delegate != null && ctx != null) {
+ this.delegate = null
+ delegate.onFinish(ctx)
+ }
+ }
+
override fun close() {
- isClosed = true
- interrupt()
- ctx = null
+ val ctx = ctx
+
+ state = SimResourceState.Stopped
+
+ if (ctx != null) {
+ this.ctx = null
+ ctx.interrupt()
+ }
}
- override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand {
+ override fun onStart(ctx: SimResourceContext<R>) {
this.ctx = ctx
-
- return onNext(ctx, 0.0)
}
- override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand {
+ val delegate = delegate
this.remainingWork = remainingWork
- return if (isClosed) {
- SimResourceCommand.Exit
- } else if (!hasDelegateStarted) {
+ if (!hasDelegateStarted) {
start()
+ }
+
+ return if (state == SimResourceState.Stopped) {
+ SimResourceCommand.Exit
+ } else if (delegate != null) {
+ val command = delegate.onNext(ctx, capacity, remainingWork)
+ if (command == SimResourceCommand.Exit) {
+ // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
+ // reset beforehand the existing state and check whether it has been updated afterwards
+ reset()
+
+ delegate.onFinish(ctx)
+
+ if (state == SimResourceState.Stopped)
+ SimResourceCommand.Exit
+ else
+ onNext(ctx, capacity, 0.0)
+ } else {
+ command
+ }
} else {
- next()
+ SimResourceCommand.Idle()
}
}
- /**
- * Start the delegate.
- */
- private fun start(): SimResourceCommand {
- val delegate = delegate ?: return SimResourceCommand.Idle()
- val command = delegate.onStart(checkNotNull(ctx))
-
- hasDelegateStarted = true
-
- return forward(command)
- }
+ override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) {
+ this.ctx = null
- /**
- * Obtain the next command to process.
- */
- private fun next(): SimResourceCommand {
val delegate = delegate
- return forward(delegate?.onNext(checkNotNull(ctx), remainingWork) ?: SimResourceCommand.Idle())
+ if (delegate != null) {
+ reset()
+ delegate.onFinish(ctx, cause)
+ }
}
/**
- * Forward the specified [command].
+ * Start the delegate.
*/
- private fun forward(command: SimResourceCommand): SimResourceCommand {
- return if (command == SimResourceCommand.Exit) {
- val cont = checkNotNull(cont)
-
- // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
- // reset beforehand the existing state and check whether it has been updated afterwards
- reset()
- cont.resume(Unit)
+ private fun start() {
+ val delegate = delegate ?: return
+ delegate.onStart(checkNotNull(ctx))
- if (isClosed)
- SimResourceCommand.Exit
- else
- start()
- } else {
- command
- }
+ hasDelegateStarted = true
}
/**
* Reset the delegate.
*/
private fun reset() {
- cont = null
delegate = null
hasDelegateStarted = false
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
index e35aa683..1593281b 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import kotlinx.coroutines.suspendCancellableCoroutine
+
/**
* A [SimResourceProvider] provides some resource of type [R].
*/
@@ -32,19 +34,51 @@ public interface SimResourceProvider<out R : SimResource> : AutoCloseable {
public val resource: R
/**
- * Consume the resource provided by this provider using the specified [consumer].
+ * The state of the resource.
+ */
+ public val state: SimResourceState
+
+ /**
+ * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously.
+ *
+ * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended.
*/
- public suspend fun consume(consumer: SimResourceConsumer<R>)
+ public fun startConsumer(consumer: SimResourceConsumer<R>)
/**
- * Interrupt the resource.
+ * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op.
*/
public fun interrupt()
/**
+ * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op.
+ */
+ public fun cancel()
+
+ /**
* End the lifetime of the resource.
*
* This operation terminates the existing resource consumer.
*/
public override fun close()
}
+
+/**
+ * Consume the resource provided by this provider using the specified [consumer] and suspend execution until
+ * the consumer has finished.
+ */
+public suspend fun <R : SimResource> SimResourceProvider<R>.consume(consumer: SimResourceConsumer<R>) {
+ return suspendCancellableCoroutine { cont ->
+ startConsumer(object : SimResourceConsumer<R> by consumer {
+ override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) {
+ assert(!cont.isCompleted) { "Coroutine already completed" }
+
+ cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit))
+
+ consumer.onFinish(ctx, cause)
+ }
+
+ override fun toString(): String = "SimSuspendingResourceConsumer"
+ })
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 540a17c9..a3b16177 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -27,9 +27,6 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import org.opendc.utils.TimerScheduler
import java.time.Clock
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
import kotlin.math.min
/**
@@ -37,6 +34,7 @@ import kotlin.math.min
*
* @param resource The resource to provide.
* @param clock The virtual clock to track simulation time.
+ * @param scheduler The scheduler to schedule the interrupts.
*/
public class SimResourceSource<R : SimResource>(
override val resource: R,
@@ -51,60 +49,48 @@ public class SimResourceSource<R : SimResource>(
private val _speed = MutableStateFlow(0.0)
/**
- * A flag to indicate that the resource was closed.
- */
- private var isClosed: Boolean = false
-
- /**
- * The current active consumer.
- */
- private var cont: CancellableContinuation<Unit>? = null
-
- /**
* The [Context] that is currently running.
*/
private var ctx: Context? = null
- override suspend fun consume(consumer: SimResourceConsumer<R>) {
- check(!isClosed) { "Lifetime of resource has ended." }
- check(cont == null) { "Run should not be called concurrently" }
+ override var state: SimResourceState = SimResourceState.Pending
+ private set
- try {
- return suspendCancellableCoroutine { cont ->
- val ctx = Context(consumer, cont)
+ override fun startConsumer(consumer: SimResourceConsumer<R>) {
+ check(state == SimResourceState.Pending) { "Resource is in invalid state" }
+ val ctx = Context(consumer)
- this.cont = cont
- this.ctx = ctx
+ this.ctx = ctx
+ this.state = SimResourceState.Active
- ctx.start()
- cont.invokeOnCancellation {
- ctx.stop()
- }
- }
- } finally {
- cont = null
- ctx = null
- }
+ ctx.start()
}
override fun close() {
- isClosed = true
- cont?.cancel()
- cont = null
- ctx = null
+ cancel()
+ state = SimResourceState.Stopped
}
override fun interrupt() {
ctx?.interrupt()
}
+ override fun cancel() {
+ val ctx = ctx
+ if (ctx != null) {
+ this.ctx = null
+ ctx.stop()
+ }
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
+
/**
* Internal implementation of [SimResourceContext] for this class.
*/
- private inner class Context(
- consumer: SimResourceConsumer<R>,
- val cont: Continuation<Unit>
- ) : SimAbstractResourceContext<R>(resource, clock, consumer) {
+ private inner class Context(consumer: SimResourceConsumer<R>) : SimAbstractResourceContext<R>(resource, clock, consumer) {
/**
* The processing speed of the resource.
*/
@@ -130,16 +116,12 @@ public class SimResourceSource<R : SimResource>(
scheduler.startSingleTimerTo(this, until, ::flush)
}
- override fun onFinish() {
+ override fun onFinish(cause: Throwable?) {
speed = 0.0
scheduler.cancel(this)
- cont.resume(Unit)
- }
+ cancel()
- override fun onFailure(cause: Throwable) {
- speed = 0.0
- scheduler.cancel(this)
- cont.resumeWithException(cause)
+ super.onFinish(cause)
}
override fun toString(): String = "SimResourceSource.Context[resource=$resource]"
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt
new file mode 100644
index 00000000..c72951d0
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * The state of a resource provider.
+ */
+public enum class SimResourceState {
+ /**
+ * The resource provider is pending and the resource is waiting to be consumed.
+ */
+ Pending,
+
+ /**
+ * The resource provider is active and the resource is currently being consumed.
+ */
+ Active,
+
+ /**
+ * The resource provider is stopped and the resource cannot be consumed anymore.
+ */
+ Stopped
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
index 060d0ea2..5eea78f6 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -22,33 +22,30 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.launch
import java.util.ArrayDeque
-import kotlin.coroutines.CoroutineContext
/**
* A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that
* a single output is directly connected to an input and that the switch can only support as much outputs as inputs.
*/
-public class SimResourceSwitchExclusive<R : SimResource>(context: CoroutineContext) : SimResourceSwitch<R> {
+public class SimResourceSwitchExclusive<R : SimResource> : SimResourceSwitch<R> {
/**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
+ * A flag to indicate that the switch is closed.
*/
- private val scope = CoroutineScope(context + Job())
+ private var isClosed: Boolean = false
- private val _outputs = mutableSetOf<SimResourceProvider<R>>()
+ private val _outputs = mutableSetOf<Provider>()
override val outputs: Set<SimResourceProvider<R>>
get() = _outputs
private val availableResources = ArrayDeque<SimResourceForwarder<R>>()
+
private val _inputs = mutableSetOf<SimResourceProvider<R>>()
override val inputs: Set<SimResourceProvider<R>>
get() = _inputs
override fun addOutput(resource: R): SimResourceProvider<R> {
+ check(!isClosed) { "Switch has been closed" }
check(availableResources.isNotEmpty()) { "No capacity to serve request" }
val forwarder = availableResources.poll()
val output = Provider(resource, forwarder)
@@ -57,33 +54,37 @@ public class SimResourceSwitchExclusive<R : SimResource>(context: CoroutineConte
}
override fun addInput(input: SimResourceProvider<R>) {
+ check(!isClosed) { "Switch has been closed" }
+
if (input in inputs) {
return
}
val forwarder = SimResourceForwarder(input.resource)
- scope.launch { input.consume(forwarder) }
-
_inputs += input
availableResources += forwarder
+
+ input.startConsumer(object : SimResourceConsumer<R> by forwarder {
+ override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) {
+ // De-register the input after it has finished
+ _inputs -= input
+ forwarder.onFinish(ctx, cause)
+ }
+ })
}
override fun close() {
- scope.cancel()
+ isClosed = true
+
+ // Cancel all upstream subscriptions
+ _inputs.forEach(SimResourceProvider<R>::cancel)
}
private inner class Provider(
override val resource: R,
private val forwarder: SimResourceForwarder<R>
- ) : SimResourceProvider<R> {
-
- override suspend fun consume(consumer: SimResourceConsumer<R>) = forwarder.consume(consumer)
-
- override fun interrupt() {
- forwarder.interrupt()
- }
-
+ ) : SimResourceProvider<R> by forwarder {
override fun close() {
_outputs -= this
availableResources += forwarder
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index bcf76d3c..6b919a77 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -25,10 +25,6 @@ package org.opendc.simulator.resources
import kotlinx.coroutines.*
import org.opendc.simulator.resources.consumer.SimConsumerBarrier
import java.time.Clock
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
@@ -39,14 +35,8 @@ import kotlin.math.min
*/
public class SimResourceSwitchMaxMin<R : SimResource>(
private val clock: Clock,
- context: CoroutineContext,
private val listener: Listener<R>? = null
) : SimResourceSwitch<R> {
- /**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
- */
- private val scope = CoroutineScope(context + Job())
-
private val inputConsumers = mutableSetOf<InputConsumer>()
private val _outputs = mutableSetOf<OutputProvider>()
override val outputs: Set<SimResourceProvider<R>>
@@ -112,9 +102,16 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
private var barrier: SimConsumerBarrier = SimConsumerBarrier(0)
/**
+ * A flag to indicate that the switch is closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
* Add an output to the switch represented by [resource].
*/
override fun addOutput(resource: R): SimResourceProvider<R> {
+ check(!isClosed) { "Switch has been closed" }
+
val provider = OutputProvider(resource)
_outputs.add(provider)
return provider
@@ -124,13 +121,15 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
* Add the specified [input] to the switch.
*/
override fun addInput(input: SimResourceProvider<R>) {
+ check(!isClosed) { "Switch has been closed" }
+
val consumer = InputConsumer(input)
_inputs.add(input)
inputConsumers += consumer
}
override fun close() {
- scope.cancel()
+ isClosed = true
}
/**
@@ -297,64 +296,56 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
*/
private inner class OutputProvider(override val resource: R) : SimResourceProvider<R> {
/**
- * A flag to indicate that the resource was closed.
- */
- private var isClosed: Boolean = false
-
- /**
- * The current active consumer.
- */
- private var cont: CancellableContinuation<Unit>? = null
-
- /**
* The [OutputContext] that is currently running.
*/
private var ctx: OutputContext? = null
- override suspend fun consume(consumer: SimResourceConsumer<R>) {
- check(!isClosed) { "Lifetime of resource has ended." }
- check(cont == null) { "Run should not be called concurrently" }
+ override var state: SimResourceState = SimResourceState.Pending
+ internal set
- try {
- return suspendCancellableCoroutine { cont ->
- val ctx = OutputContext(resource, consumer, cont)
- ctx.start()
- cont.invokeOnCancellation {
- ctx.stop()
- }
+ override fun startConsumer(consumer: SimResourceConsumer<R>) {
+ check(state == SimResourceState.Pending) { "Resource cannot be consumed" }
- this.cont = cont
- this.ctx = ctx
+ val ctx = OutputContext(this, resource, consumer)
+ this.ctx = ctx
+ this.state = SimResourceState.Active
+ outputContexts += ctx
- outputContexts += ctx
- schedule()
- }
- } finally {
- cont = null
- ctx = null
- }
+ ctx.start()
+ schedule()
}
override fun close() {
- isClosed = true
- cont?.cancel()
- cont = null
- ctx = null
+ cancel()
+
+ state = SimResourceState.Stopped
_outputs.remove(this)
}
override fun interrupt() {
ctx?.interrupt()
}
+
+ override fun cancel() {
+ val ctx = ctx
+ if (ctx != null) {
+ this.ctx = null
+ ctx.stop()
+ }
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
}
/**
* A [SimAbstractResourceContext] for the output resources.
*/
private inner class OutputContext(
+ private val provider: OutputProvider,
resource: R,
- consumer: SimResourceConsumer<R>,
- private val cont: Continuation<Unit>
+ consumer: SimResourceConsumer<R>
) : SimAbstractResourceContext<R>(resource, clock, consumer), Comparable<OutputContext> {
/**
* The current command that is processed by the vCPU.
@@ -371,11 +362,6 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
*/
var actualSpeed: Double = 0.0
- /**
- * A flag to indicate that the CPU has exited.
- */
- var hasExited: Boolean = false
-
override fun onIdle(deadline: Long) {
allowedSpeed = 0.0
activeCommand = SimResourceCommand.Idle(deadline)
@@ -386,16 +372,11 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
activeCommand = SimResourceCommand.Consume(work, limit, deadline)
}
- override fun onFinish() {
- hasExited = true
+ override fun onFinish(cause: Throwable?) {
activeCommand = SimResourceCommand.Exit
- cont.resume(Unit)
- }
+ provider.cancel()
- override fun onFailure(cause: Throwable) {
- hasExited = true
- activeCommand = SimResourceCommand.Exit
- cont.resumeWithException(cause)
+ super.onFinish(cause)
}
override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
@@ -453,21 +434,8 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
private lateinit var ctx: SimResourceContext<R>
init {
- scope.launch {
- try {
- barrier = SimConsumerBarrier(barrier.parties + 1)
- input.consume(this@InputConsumer)
- } catch (e: CancellationException) {
- // Cancel gracefully
- throw e
- } catch (e: Throwable) {
- e.printStackTrace()
- } finally {
- barrier = SimConsumerBarrier(barrier.parties - 1)
- inputConsumers -= this@InputConsumer
- _inputs -= input
- }
- }
+ barrier = SimConsumerBarrier(barrier.parties + 1)
+ input.startConsumer(this@InputConsumer)
}
/**
@@ -477,12 +445,11 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
ctx.interrupt()
}
- override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand {
+ override fun onStart(ctx: SimResourceContext<R>) {
this.ctx = ctx
- return commands[ctx.resource] ?: SimResourceCommand.Idle()
}
- override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand {
totalRemainingWork += remainingWork
val isLast = barrier.enter()
@@ -504,5 +471,13 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
commands[ctx.resource] ?: SimResourceCommand.Idle()
}
}
+
+ override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) {
+ barrier = SimConsumerBarrier(barrier.parties - 1)
+ inputConsumers -= this@InputConsumer
+ _inputs -= input
+
+ super.onFinish(ctx, cause)
+ }
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
index 03a3cebd..a00ee575 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -31,14 +31,16 @@ import org.opendc.simulator.resources.SimResourceContext
* A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource
* consumption for some period of time.
*/
-public class SimTraceConsumer(trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> {
- private val iterator = trace.iterator()
+public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> {
+ private var iterator: Iterator<Fragment>? = null
- override fun onStart(ctx: SimResourceContext<SimResource>): SimResourceCommand {
- return onNext(ctx, 0.0)
+ override fun onStart(ctx: SimResourceContext<SimResource>) {
+ check(iterator == null) { "Consumer already running" }
+ iterator = trace.iterator()
}
- override fun onNext(ctx: SimResourceContext<SimResource>, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext<SimResource>, capacity: Double, remainingWork: Double): SimResourceCommand {
+ val iterator = checkNotNull(iterator)
return if (iterator.hasNext()) {
val now = ctx.clock.millis()
val fragment = iterator.next()
@@ -56,6 +58,10 @@ public class SimTraceConsumer(trace: Sequence<Fragment>) : SimResourceConsumer<S
}
}
+ override fun onFinish(ctx: SimResourceContext<SimResource>, cause: Throwable?) {
+ iterator = null
+ }
+
/**
* A fragment of the workload.
*/
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
new file mode 100644
index 00000000..bad2f403
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+import org.opendc.simulator.resources.SimResource
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+
+/**
+ * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization.
+ */
+public class SimWorkConsumer<R : SimResource>(
+ private val work: Double,
+ private val utilization: Double
+) : SimResourceConsumer<R> {
+
+ init {
+ require(work >= 0.0) { "Work must be positive" }
+ require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
+ }
+
+ private var limit = 0.0
+ private var remainingWork: Double = 0.0
+
+ override fun onStart(ctx: SimResourceContext<R>) {
+ limit = ctx.resource.capacity * utilization
+ remainingWork = work
+ }
+
+ override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand {
+ val work = this.remainingWork + remainingWork
+ this.remainingWork -= work
+ return if (work > 0.0) {
+ SimResourceCommand.Consume(work, limit)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index e7642dc1..0bc87473 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -22,11 +22,10 @@
package org.opendc.simulator.resources
+import io.mockk.*
import kotlinx.coroutines.*
import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.*
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertTrue
import org.opendc.simulator.utils.DelayControllerClockAdapter
/**
@@ -45,28 +44,15 @@ class SimResourceContextTest {
val resource = SimCpu(4200.0)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(10.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
- override fun onIdle(deadline: Long) {
- }
+ override fun onIdle(deadline: Long) {}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- }
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
- override fun onFinish() {
- }
-
- override fun onFailure(cause: Throwable) {
- }
+ override fun onFinish(cause: Throwable?) {}
}
context.flush()
@@ -77,36 +63,20 @@ class SimResourceContextTest {
val clock = DelayControllerClockAdapter(this)
val resource = SimCpu(4200.0)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(10.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
-
- var counter = 0
- val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
- override fun onIdle(deadline: Long) {
- }
-
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- counter++
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- override fun onFinish() {
- }
-
- override fun onFailure(cause: Throwable) {
- }
- }
+ val context = spyk(object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ })
context.start()
delay(1) // Delay 1 ms to prevent hitting the fast path
context.flush(isIntermediate = true)
- assertEquals(2, counter)
+
+ verify(exactly = 2) { context.onConsume(any(), any(), any()) }
}
@Test
@@ -114,33 +84,14 @@ class SimResourceContextTest {
val clock = DelayControllerClockAdapter(this)
val resource = SimCpu(4200.0)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Idle(10)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
-
- var counter = 0
- var isFinished = false
- val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
- override fun onIdle(deadline: Long) {
- counter++
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- }
-
- override fun onFinish() {
- isFinished = true
- }
-
- override fun onFailure(cause: Throwable) {
- }
- }
+ val context = spyk(object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ })
context.start()
delay(5)
@@ -149,8 +100,26 @@ class SimResourceContextTest {
context.flush(isIntermediate = true)
assertAll(
- { assertEquals(1, counter) },
- { assertTrue(isFinished) }
+ { verify(exactly = 1) { context.onIdle(any()) } },
+ { verify(exactly = 1) { context.onFinish(null) } }
)
}
+
+ @Test
+ fun testDoubleStart() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val resource = SimCpu(4200.0)
+
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
+
+ val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ }
+
+ context.start()
+ assertThrows<IllegalStateException> { context.start() }
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
index ced1bd98..b1b959ba 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
@@ -22,10 +22,16 @@
package org.opendc.simulator.resources
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.verify
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runBlockingTest
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
@@ -53,14 +59,15 @@ internal class SimResourceForwarderTest {
}
forwarder.consume(object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Exit
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ override fun onNext(
+ ctx: SimResourceContext<SimCpu>,
+ capacity: Double,
+ remainingWork: Double
+ ): SimResourceCommand {
return SimResourceCommand.Exit
}
})
+
forwarder.close()
scheduler.close()
}
@@ -78,15 +85,100 @@ internal class SimResourceForwarderTest {
}
forwarder.consume(object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
+ var isFirst = true
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
+ override fun onNext(
+ ctx: SimResourceContext<SimCpu>,
+ capacity: Double,
+ remainingWork: Double
+ ): SimResourceCommand {
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(10.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
}
})
forwarder.close()
}
+
+ @Test
+ fun testState() = runBlockingTest {
+ val forwarder = SimResourceForwarder(SimCpu(1000.0))
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onNext(
+ ctx: SimResourceContext<SimCpu>,
+ capacity: Double,
+ remainingWork: Double
+ ): SimResourceCommand = SimResourceCommand.Exit
+ }
+
+ assertEquals(SimResourceState.Pending, forwarder.state)
+
+ forwarder.startConsumer(consumer)
+ assertEquals(SimResourceState.Active, forwarder.state)
+
+ assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) }
+
+ forwarder.cancel()
+ assertEquals(SimResourceState.Pending, forwarder.state)
+
+ forwarder.close()
+ assertEquals(SimResourceState.Stopped, forwarder.state)
+ }
+
+ @Test
+ fun testCancelPendingDelegate() = runBlockingTest {
+ val forwarder = SimResourceForwarder(SimCpu(1000.0))
+
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit
+
+ forwarder.startConsumer(consumer)
+ forwarder.cancel()
+
+ verify(exactly = 0) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testCancelStartedDelegate() = runBlockingTest {
+ val forwarder = SimResourceForwarder(SimCpu(1000.0))
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(SimCpu(2000.0), clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10)
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ forwarder.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any()) }
+ verify(exactly = 1) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testCancelPropagation() = runBlockingTest {
+ val forwarder = SimResourceForwarder(SimCpu(1000.0))
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(SimCpu(2000.0), clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10)
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ source.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any()) }
+ verify(exactly = 1) { consumer.onFinish(any(), null) }
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 4f7825fc..18f18ded 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import io.mockk.every
+import io.mockk.mockk
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.runBlockingTest
@@ -46,15 +48,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1000 * ctx.resource.speed, ctx.resource.speed)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1000 * provider.resource.speed, provider.resource.speed))
+ .andThen(SimResourceCommand.Exit)
try {
val res = mutableListOf<Double>()
@@ -76,15 +73,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1000 * ctx.resource.speed, 2 * ctx.resource.speed)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1000 * provider.resource.speed, 2 * provider.resource.speed))
+ .andThen(SimResourceCommand.Exit)
try {
val res = mutableListOf<Double>()
@@ -111,13 +103,12 @@ class SimResourceSourceTest {
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ override fun onStart(ctx: SimResourceContext<SimCpu>) {
ctx.interrupt()
- return SimResourceCommand.Exit
}
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
+ override fun onNext(ctx: SimResourceContext<SimCpu>, capacity: Double, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
}
}
@@ -137,14 +128,19 @@ class SimResourceSourceTest {
lateinit var resCtx: SimResourceContext<SimCpu>
val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ var isFirst = true
+ override fun onStart(ctx: SimResourceContext<SimCpu>) {
resCtx = ctx
- return SimResourceCommand.Consume(4.0, 1.0)
}
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext<SimCpu>, capacity: Double, remainingWork: Double): SimResourceCommand {
assertEquals(0.0, remainingWork)
- return SimResourceCommand.Exit
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(4.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
}
}
@@ -168,15 +164,9 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- throw IllegalStateException()
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onStart(any()) }
+ .throws(IllegalStateException())
try {
assertThrows<IllegalStateException> {
@@ -194,15 +184,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
try {
assertThrows<IllegalStateException> {
@@ -220,15 +205,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
try {
assertThrows<IllegalStateException> {
@@ -249,15 +229,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
try {
assertThrows<IllegalStateException> {
@@ -276,15 +251,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
try {
launch { provider.consume(consumer) }
@@ -304,15 +274,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Idle(ctx.clock.millis() + 500)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Idle(clock.millis() + 500))
+ .andThen(SimResourceCommand.Exit)
try {
provider.consume(consumer)
@@ -332,15 +297,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Idle()
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Idle())
+ .andThenThrows(IllegalStateException())
try {
provider.consume(consumer)
@@ -351,4 +311,25 @@ class SimResourceSourceTest {
}
}
}
+
+ @Test
+ fun testIncorrectDeadline() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Idle(2))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ delay(10)
+
+ assertThrows<IllegalArgumentException> { provider.consume(consumer) }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index ca6558bf..354dab93 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import io.mockk.every
+import io.mockk.mockk
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
@@ -34,7 +36,6 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.resources.consumer.SimTraceConsumer
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
-import java.lang.IllegalStateException
/**
* Test suite for the [SimResourceSwitchExclusive] class.
@@ -67,7 +68,7 @@ internal class SimResourceSwitchExclusiveTest {
),
)
- val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val switch = SimResourceSwitchExclusive<SimCpu>()
val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
switch.addInput(source)
@@ -98,17 +99,10 @@ internal class SimResourceSwitchExclusiveTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val duration = 5 * 60L * 1000
- val workload = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(duration / 1000.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val workload = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
- val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val switch = SimResourceSwitchExclusive<SimCpu>()
val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
switch.addInput(source)
@@ -134,16 +128,27 @@ internal class SimResourceSwitchExclusiveTest {
val duration = 5 * 60L * 1000
val workload = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(duration / 1000.0, 1.0)
+ var isFirst = true
+
+ override fun onStart(ctx: SimResourceContext<SimCpu>) {
+ isFirst = true
}
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
+ override fun onNext(
+ ctx: SimResourceContext<SimCpu>,
+ capacity: Double,
+ remainingWork: Double
+ ): SimResourceCommand {
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(duration / 1000.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
}
}
- val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val switch = SimResourceSwitchExclusive<SimCpu>()
val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
switch.addInput(source)
@@ -169,17 +174,10 @@ internal class SimResourceSwitchExclusiveTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val duration = 5 * 60L * 1000
- val workload = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(duration.toDouble(), 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val workload = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
- val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val switch = SimResourceSwitchExclusive<SimCpu>()
val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
switch.addInput(source)
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
index 698c1700..e8f5a13c 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import io.mockk.every
+import io.mockk.mockk
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
@@ -47,22 +49,15 @@ internal class SimResourceSwitchMaxMinTest {
fun testSmoke() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val switch = SimResourceSwitchMaxMin<SimCpu>(clock, coroutineContext)
+ val switch = SimResourceSwitchMaxMin<SimCpu>(clock)
val sources = List(2) { SimResourceSource(SimCpu(2000.0), clock, scheduler) }
sources.forEach { switch.addInput(it) }
val provider = switch.addOutput(SimCpu(1000.0))
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit
try {
provider.consume(consumer)
@@ -112,7 +107,7 @@ internal class SimResourceSwitchMaxMinTest {
),
)
- val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener)
+ val switch = SimResourceSwitchMaxMin(clock, listener)
val provider = switch.addOutput(SimCpu(3200.0))
try {
@@ -180,7 +175,7 @@ internal class SimResourceSwitchMaxMinTest {
)
)
- val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener)
+ val switch = SimResourceSwitchMaxMin(clock, listener)
val providerA = switch.addOutput(SimCpu(3200.0))
val providerB = switch.addOutput(SimCpu(3200.0))
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
new file mode 100644
index 00000000..b05195f7
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimWorkConsumer] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimWorkConsumerTest {
+ data class SimCpu(val speed: Double) : SimResource {
+ override val capacity: Double
+ get() = speed
+ }
+
+ @Test
+ fun testSmoke() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(1.0), clock, scheduler)
+
+ val consumer = SimWorkConsumer<SimCpu>(1.0, 1.0)
+
+ try {
+ provider.consume(consumer)
+ assertEquals(1000, currentTime)
+ } finally {
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testUtilization() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(1.0), clock, scheduler)
+
+ val consumer = SimWorkConsumer<SimCpu>(1.0, 0.5)
+
+ try {
+ provider.consume(consumer)
+ assertEquals(2000, currentTime)
+ } finally {
+ provider.close()
+ }
+ }
+}
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
index 49964938..d4bc7b5c 100644
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
+++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
@@ -93,7 +93,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
try {
timer()
} catch (e: Throwable) {
- Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e)
+ coroutineContext[CoroutineExceptionHandler]?.handleException(coroutineContext, e)
}
}
}