summaryrefslogtreecommitdiff
path: root/simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-18 16:25:00 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-22 16:47:56 +0100
commit9dab4d7b3921cd48199d773c7dc4bae0f2273223 (patch)
tree855d7da216be486ad53258d18bbf7fa2d9d04cec /simulator
parent0fa1dc262905c42b3549172fea59f7ad4cb58b1f (diff)
simulator: Re-design consumer interface to support capacity negotiation
This change re-designs the SimResourceConsumer interface to support in the future capacity negotiation. This basically means that the consumer will be informed directly when not enough capacity is available, instead of after the deadline specified by the consumer.
Diffstat (limited to 'simulator')
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt8
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt3
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt3
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt3
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt27
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt23
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt6
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt74
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt31
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt138
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt40
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt72
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt43
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt41
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt129
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt16
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt60
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt115
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt112
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt159
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt52
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt19
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt74
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt2
27 files changed, 733 insertions, 525 deletions
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 6929b06c..c4bd0cb4 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -136,8 +136,8 @@ internal class SimHostTest {
assertAll(
{ assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
- { assertEquals(4273200, requestedWork, "Requested work does not match") },
- { assertEquals(3133200, grantedWork, "Granted work does not match") },
+ { assertEquals(4281600, requestedWork, "Requested work does not match") },
+ { assertEquals(3141600, grantedWork, "Granted work does not match") },
{ assertEquals(1140000, overcommittedWork, "Overcommitted work does not match") },
{ assertEquals(1200006, scope.currentTime) }
)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 59ce895f..7dae53be 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -143,8 +143,8 @@ class CapelinIntegrationTest {
assertAll(
{ assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
{ assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
- { assertEquals(1707132711051, monitor.totalRequestedBurst) },
- { assertEquals(457881474296, monitor.totalGrantedBurst) },
+ { assertEquals(1707144601723, monitor.totalRequestedBurst) },
+ { assertEquals(457893364971, monitor.totalGrantedBurst) },
{ assertEquals(1220323969993, monitor.totalOvercommissionedBurst) },
{ assertEquals(0, monitor.totalInterferedBurst) }
)
@@ -189,8 +189,8 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(711464322955, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(175226276978, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(711464339925, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(175226293948, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
{ assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
index a99b082a..281d43ae 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
@@ -32,7 +32,6 @@ import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.*
import java.time.Clock
-import kotlin.coroutines.CoroutineContext
/**
* Abstract implementation of the [SimHypervisor] interface.
@@ -121,7 +120,7 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
override val clock: Clock
get() = this@SimAbstractHypervisor.context.clock
- override val meta: Map<String, Any> = meta + mapOf("coroutine-context" to context.meta["coroutine-context"] as CoroutineContext)
+ override val meta: Map<String, Any> = meta
override fun interrupt(resource: SimResource) {
requireNotNull(this@VirtualMachine.cpus[resource]).interrupt()
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 39ae34fe..44906c2b 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -33,6 +33,7 @@ import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.SimResource
import org.opendc.simulator.resources.SimResourceProvider
import org.opendc.simulator.resources.SimResourceSource
+import org.opendc.simulator.resources.consume
import java.time.Clock
import kotlin.coroutines.CoroutineContext
@@ -91,7 +92,7 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = withContext(context) {
val resources = resources
require(!isTerminated) { "Machine is terminated" }
- val ctx = Context(resources, meta + mapOf("coroutine-context" to context))
+ val ctx = Context(resources, meta)
val totalCapacity = model.cpus.sumByDouble { it.frequency }
_speed = MutableList(model.cpus.size) { 0.0 }
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index bb97192d..c629fbd9 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -25,7 +25,6 @@ package org.opendc.simulator.compute
import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.*
-import kotlin.coroutines.CoroutineContext
/**
* A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single
@@ -40,7 +39,6 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> {
return SimResourceSwitchMaxMin(
ctx.clock,
- ctx.meta["coroutine-context"] as CoroutineContext,
object : SimResourceSwitchMaxMin.Listener<SimProcessingUnit> {
override fun onSliceFinish(
switch: SimResourceSwitchMaxMin<SimProcessingUnit>,
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
index 2001a230..5de69884 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
@@ -24,7 +24,6 @@ package org.opendc.simulator.compute
import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.resources.*
-import kotlin.coroutines.CoroutineContext
/**
* A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
@@ -35,6 +34,6 @@ public class SimSpaceSharedHypervisor : SimAbstractHypervisor() {
}
override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> {
- return SimResourceSwitchExclusive(ctx.meta["coroutine-context"] as CoroutineContext)
+ return SimResourceSwitchExclusive()
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index 9b47821e..f1079ee6 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -24,9 +24,8 @@ package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.model.SimProcessingUnit
-import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
* A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on
@@ -47,29 +46,7 @@ public class SimFlopsWorkload(
override fun onStart(ctx: SimMachineContext) {}
override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
- return CpuConsumer(ctx)
- }
-
- private inner class CpuConsumer(private val machine: SimMachineContext) : SimResourceConsumer<SimProcessingUnit> {
- override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
- val limit = ctx.resource.frequency * utilization
- val work = flops.toDouble() / machine.cpus.size
-
- return if (work > 0.0) {
- SimResourceCommand.Consume(work, limit)
- } else {
- SimResourceCommand.Exit
- }
- }
-
- override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
- return if (remainingWork > 0.0) {
- val limit = ctx.resource.frequency * utilization
- return SimResourceCommand.Consume(remainingWork, limit)
- } else {
- SimResourceCommand.Exit
- }
- }
+ return SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization)
}
override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)"
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
index 313b6ed5..d7aa8f80 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
@@ -24,9 +24,8 @@ package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.model.SimProcessingUnit
-import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
* A [SimWorkload] that models application execution as a single duration.
@@ -46,24 +45,8 @@ public class SimRuntimeWorkload(
override fun onStart(ctx: SimMachineContext) {}
override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
- return CpuConsumer()
- }
-
- private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> {
- override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
- val limit = ctx.resource.frequency * utilization
- val work = (limit / 1000) * duration
- return SimResourceCommand.Consume(work, limit)
- }
-
- override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
- return if (remainingWork > 0.0) {
- val limit = ctx.resource.frequency * utilization
- SimResourceCommand.Consume(remainingWork, limit)
- } else {
- SimResourceCommand.Exit
- }
- }
+ val limit = cpu.frequency * utilization
+ return SimWorkConsumer((limit / 1000) * duration, utilization)
}
override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)"
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 31f58a0f..cc4f3136 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -50,11 +50,7 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
}
private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> {
- override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
- return onNext(ctx, 0.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, capacity: Double, remainingWork: Double): SimResourceCommand {
val now = ctx.clock.millis()
val fragment = fragment ?: return SimResourceCommand.Exit
val work = (fragment.duration / 1000) * fragment.usage
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index 52251bff..431ca625 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -49,12 +49,9 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
/**
* This method is invoked when the resource consumer has finished.
*/
- public abstract fun onFinish()
-
- /**
- * This method is invoked when the resource consumer throws an exception.
- */
- public abstract fun onFailure(cause: Throwable)
+ public open fun onFinish(cause: Throwable?) {
+ consumer.onFinish(this, cause)
+ }
/**
* Compute the duration that a resource consumption will take with the specified [speed].
@@ -67,7 +64,14 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
* Compute the speed at which the resource may be consumed.
*/
protected open fun getSpeed(limit: Double): Double {
- return min(limit, resource.capacity)
+ return min(limit, getCapacity())
+ }
+
+ /**
+ * Return the capacity available for the resource consumer.
+ */
+ protected open fun getCapacity(): Double {
+ return resource.capacity
}
/**
@@ -93,13 +97,17 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
* Start the consumer.
*/
public fun start() {
- try {
- isProcessing = true
- latestFlush = clock.millis()
+ check(state == SimResourceState.Pending) { "Consumer is already started" }
+
+ state = SimResourceState.Active
+ isProcessing = true
+ latestFlush = clock.millis()
- interpret(consumer.onStart(this))
- } catch (e: Throwable) {
- onFailure(e)
+ try {
+ consumer.onStart(this)
+ interpret(consumer.onNext(this, getCapacity(), 0.0))
+ } catch (cause: Throwable) {
+ doStop(cause)
} finally {
isProcessing = false
}
@@ -114,9 +122,9 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
latestFlush = clock.millis()
flush(isIntermediate = true)
- onFinish()
- } catch (e: Throwable) {
- onFailure(e)
+ doStop(null)
+ } catch (cause: Throwable) {
+ doStop(cause)
} finally {
isProcessing = false
}
@@ -129,7 +137,12 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
* flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
* will be asked to deliver a new command and is essentially interrupted.
*/
- public open fun flush(isIntermediate: Boolean = false) {
+ public fun flush(isIntermediate: Boolean = false) {
+ // Flush is no-op when the consumer is finished or not yet started
+ if (state != SimResourceState.Active) {
+ return
+ }
+
val now = clock.millis()
// Fast path: if the intermediate progress was already flushed at the current instant, we can skip it.
@@ -177,8 +190,8 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
// Flush may not be called when the resource consumer has finished
throw IllegalStateException()
}
- } catch (e: Throwable) {
- onFailure(e)
+ } catch (cause: Throwable) {
+ doStop(cause)
} finally {
latestFlush = now
isProcessing = false
@@ -203,6 +216,11 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
protected var isProcessing: Boolean = false
/**
+ * A flag to indicate the state of the context.
+ */
+ private var state: SimResourceState = SimResourceState.Pending
+
+ /**
* The current command that is being processed.
*/
private var activeCommand: CommandWrapper? = null
@@ -213,6 +231,18 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
private var latestFlush: Long = Long.MIN_VALUE
/**
+ * Finish the consumer and resource provider.
+ */
+ private fun doStop(cause: Throwable?) {
+ val state = state
+ this.state = SimResourceState.Stopped
+
+ if (state == SimResourceState.Active) {
+ onFinish(cause)
+ }
+ }
+
+ /**
* Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
*/
private fun interpret(command: SimResourceCommand) {
@@ -235,9 +265,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
onConsume(work, limit, deadline)
}
- is SimResourceCommand.Exit -> {
- onFinish()
- }
+ is SimResourceCommand.Exit -> doStop(null)
}
assert(activeCommand == null) { "Concurrent access to current command" }
@@ -248,7 +276,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
* Request the workload for more work.
*/
private fun next(remainingWork: Double) {
- interpret(consumer.onNext(this, remainingWork))
+ interpret(consumer.onNext(this, getCapacity(), remainingWork))
}
/**
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
index 77c0a7a9..21f56f9b 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
@@ -29,7 +29,7 @@ public sealed class SimResourceCommand {
/**
* A request to the resource to perform the specified amount of work before the given [deadline].
*
- * @param work The amount of work to process on the CPU.
+ * @param work The amount of work to process.
* @param limit The maximum amount of work to be processed per second.
* @param deadline The instant at which the work needs to be fulfilled.
*/
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
index f516faa6..7637ee69 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -23,23 +23,38 @@
package org.opendc.simulator.resources
/**
- * A SimResourceConsumer characterizes how a [SimResource] is consumed.
+ * A [SimResourceConsumer] characterizes how a [SimResource] is consumed.
+ *
+ * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently)
+ * for multiple resource providers, unless explicitly said otherwise.
*/
public interface SimResourceConsumer<in R : SimResource> {
/**
- * This method is invoked when the consumer is started for a resource.
+ * This method is invoked when the consumer is started for some resource.
*
* @param ctx The execution context in which the consumer runs.
- * @return The next command that the resource should perform.
*/
- public fun onStart(ctx: SimResourceContext<R>): SimResourceCommand
+ public fun onStart(ctx: SimResourceContext<R>) {}
/**
- * This method is invoked when a resource was either interrupted or reached its deadline.
+ * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because
+ * the resource finished processing, reached its deadline or was interrupted.
*
* @param ctx The execution context in which the consumer runs.
- * @param remainingWork The remaining work that was not yet completed.
- * @return The next command that the resource should perform.
+ * @param capacity The capacity that is available for the consumer. In case no capacity is available, zero is given.
+ * @param remainingWork The work of the previous command that was not yet completed due to interruption or deadline.
+ * @return The next command that the resource should execute.
+ */
+ public fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand
+
+ /**
+ * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit],
+ * the resource finished itself, or a failure occurred at the resource.
+ *
+ * Note that throwing an exception in [onStart] or [onNext] is undefined behavior and up to the resource provider.
+ *
+ * @param ctx The execution context in which the consumer ran.
+ * @param cause The cause of the finish in case the resource finished exceptionally.
*/
- public fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand
+ public fun onFinish(ctx: SimResourceContext<R>, cause: Throwable? = null) {}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
index ca23557c..23bf8739 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
@@ -22,10 +22,6 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-
/**
* A helper class to construct a [SimResourceProvider] which forwards the requests to a [SimResourceConsumer].
*/
@@ -37,16 +33,6 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) :
private var ctx: SimResourceContext<R>? = null
/**
- * A flag to indicate that the forwarder is closed.
- */
- private var isClosed: Boolean = false
-
- /**
- * The continuation to resume after consumption.
- */
- private var cont: Continuation<Unit>? = null
-
- /**
* The delegate [SimResourceConsumer].
*/
private var delegate: SimResourceConsumer<R>? = null
@@ -61,95 +47,111 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) :
*/
private var remainingWork: Double = 0.0
- override suspend fun consume(consumer: SimResourceConsumer<R>) {
- check(!isClosed) { "Lifecycle of forwarder has ended" }
- check(cont == null) { "Run should not be called concurrently" }
-
- return suspendCancellableCoroutine { cont ->
- this.cont = cont
- this.delegate = consumer
+ /**
+ * The state of the forwarder.
+ */
+ override var state: SimResourceState = SimResourceState.Pending
+ private set
- cont.invokeOnCancellation { reset() }
+ override fun startConsumer(consumer: SimResourceConsumer<R>) {
+ check(state == SimResourceState.Pending) { "Resource is in invalid state" }
- ctx?.interrupt()
- }
+ state = SimResourceState.Active
+ delegate = consumer
+ interrupt()
}
override fun interrupt() {
ctx?.interrupt()
}
+ override fun cancel() {
+ val delegate = delegate
+ val ctx = ctx
+
+ state = SimResourceState.Pending
+
+ if (delegate != null && ctx != null) {
+ this.delegate = null
+ delegate.onFinish(ctx)
+ }
+ }
+
override fun close() {
- isClosed = true
- interrupt()
- ctx = null
+ val ctx = ctx
+
+ state = SimResourceState.Stopped
+
+ if (ctx != null) {
+ this.ctx = null
+ ctx.interrupt()
+ }
}
- override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand {
+ override fun onStart(ctx: SimResourceContext<R>) {
this.ctx = ctx
-
- return onNext(ctx, 0.0)
}
- override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand {
+ val delegate = delegate
this.remainingWork = remainingWork
- return if (isClosed) {
- SimResourceCommand.Exit
- } else if (!hasDelegateStarted) {
+ if (!hasDelegateStarted) {
start()
+ }
+
+ return if (state == SimResourceState.Stopped) {
+ SimResourceCommand.Exit
+ } else if (delegate != null) {
+ val command = delegate.onNext(ctx, capacity, remainingWork)
+ if (command == SimResourceCommand.Exit) {
+ // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
+ // reset beforehand the existing state and check whether it has been updated afterwards
+ reset()
+
+ delegate.onFinish(ctx)
+
+ if (state == SimResourceState.Stopped)
+ SimResourceCommand.Exit
+ else
+ onNext(ctx, capacity, 0.0)
+ } else {
+ command
+ }
} else {
- next()
+ SimResourceCommand.Idle()
}
}
- /**
- * Start the delegate.
- */
- private fun start(): SimResourceCommand {
- val delegate = delegate ?: return SimResourceCommand.Idle()
- val command = delegate.onStart(checkNotNull(ctx))
-
- hasDelegateStarted = true
-
- return forward(command)
- }
+ override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) {
+ this.ctx = null
- /**
- * Obtain the next command to process.
- */
- private fun next(): SimResourceCommand {
val delegate = delegate
- return forward(delegate?.onNext(checkNotNull(ctx), remainingWork) ?: SimResourceCommand.Idle())
+ if (delegate != null) {
+ reset()
+ delegate.onFinish(ctx, cause)
+ }
}
/**
- * Forward the specified [command].
+ * Start the delegate.
*/
- private fun forward(command: SimResourceCommand): SimResourceCommand {
- return if (command == SimResourceCommand.Exit) {
- val cont = checkNotNull(cont)
-
- // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
- // reset beforehand the existing state and check whether it has been updated afterwards
- reset()
- cont.resume(Unit)
+ private fun start() {
+ val delegate = delegate ?: return
+ delegate.onStart(checkNotNull(ctx))
- if (isClosed)
- SimResourceCommand.Exit
- else
- start()
- } else {
- command
- }
+ hasDelegateStarted = true
}
/**
* Reset the delegate.
*/
private fun reset() {
- cont = null
delegate = null
hasDelegateStarted = false
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
index e35aa683..1593281b 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import kotlinx.coroutines.suspendCancellableCoroutine
+
/**
* A [SimResourceProvider] provides some resource of type [R].
*/
@@ -32,19 +34,51 @@ public interface SimResourceProvider<out R : SimResource> : AutoCloseable {
public val resource: R
/**
- * Consume the resource provided by this provider using the specified [consumer].
+ * The state of the resource.
+ */
+ public val state: SimResourceState
+
+ /**
+ * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously.
+ *
+ * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended.
*/
- public suspend fun consume(consumer: SimResourceConsumer<R>)
+ public fun startConsumer(consumer: SimResourceConsumer<R>)
/**
- * Interrupt the resource.
+ * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op.
*/
public fun interrupt()
/**
+ * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op.
+ */
+ public fun cancel()
+
+ /**
* End the lifetime of the resource.
*
* This operation terminates the existing resource consumer.
*/
public override fun close()
}
+
+/**
+ * Consume the resource provided by this provider using the specified [consumer] and suspend execution until
+ * the consumer has finished.
+ */
+public suspend fun <R : SimResource> SimResourceProvider<R>.consume(consumer: SimResourceConsumer<R>) {
+ return suspendCancellableCoroutine { cont ->
+ startConsumer(object : SimResourceConsumer<R> by consumer {
+ override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) {
+ assert(!cont.isCompleted) { "Coroutine already completed" }
+
+ cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit))
+
+ consumer.onFinish(ctx, cause)
+ }
+
+ override fun toString(): String = "SimSuspendingResourceConsumer"
+ })
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 540a17c9..a3b16177 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -27,9 +27,6 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import org.opendc.utils.TimerScheduler
import java.time.Clock
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
import kotlin.math.min
/**
@@ -37,6 +34,7 @@ import kotlin.math.min
*
* @param resource The resource to provide.
* @param clock The virtual clock to track simulation time.
+ * @param scheduler The scheduler to schedule the interrupts.
*/
public class SimResourceSource<R : SimResource>(
override val resource: R,
@@ -51,60 +49,48 @@ public class SimResourceSource<R : SimResource>(
private val _speed = MutableStateFlow(0.0)
/**
- * A flag to indicate that the resource was closed.
- */
- private var isClosed: Boolean = false
-
- /**
- * The current active consumer.
- */
- private var cont: CancellableContinuation<Unit>? = null
-
- /**
* The [Context] that is currently running.
*/
private var ctx: Context? = null
- override suspend fun consume(consumer: SimResourceConsumer<R>) {
- check(!isClosed) { "Lifetime of resource has ended." }
- check(cont == null) { "Run should not be called concurrently" }
+ override var state: SimResourceState = SimResourceState.Pending
+ private set
- try {
- return suspendCancellableCoroutine { cont ->
- val ctx = Context(consumer, cont)
+ override fun startConsumer(consumer: SimResourceConsumer<R>) {
+ check(state == SimResourceState.Pending) { "Resource is in invalid state" }
+ val ctx = Context(consumer)
- this.cont = cont
- this.ctx = ctx
+ this.ctx = ctx
+ this.state = SimResourceState.Active
- ctx.start()
- cont.invokeOnCancellation {
- ctx.stop()
- }
- }
- } finally {
- cont = null
- ctx = null
- }
+ ctx.start()
}
override fun close() {
- isClosed = true
- cont?.cancel()
- cont = null
- ctx = null
+ cancel()
+ state = SimResourceState.Stopped
}
override fun interrupt() {
ctx?.interrupt()
}
+ override fun cancel() {
+ val ctx = ctx
+ if (ctx != null) {
+ this.ctx = null
+ ctx.stop()
+ }
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
+
/**
* Internal implementation of [SimResourceContext] for this class.
*/
- private inner class Context(
- consumer: SimResourceConsumer<R>,
- val cont: Continuation<Unit>
- ) : SimAbstractResourceContext<R>(resource, clock, consumer) {
+ private inner class Context(consumer: SimResourceConsumer<R>) : SimAbstractResourceContext<R>(resource, clock, consumer) {
/**
* The processing speed of the resource.
*/
@@ -130,16 +116,12 @@ public class SimResourceSource<R : SimResource>(
scheduler.startSingleTimerTo(this, until, ::flush)
}
- override fun onFinish() {
+ override fun onFinish(cause: Throwable?) {
speed = 0.0
scheduler.cancel(this)
- cont.resume(Unit)
- }
+ cancel()
- override fun onFailure(cause: Throwable) {
- speed = 0.0
- scheduler.cancel(this)
- cont.resumeWithException(cause)
+ super.onFinish(cause)
}
override fun toString(): String = "SimResourceSource.Context[resource=$resource]"
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt
new file mode 100644
index 00000000..c72951d0
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * The state of a resource provider.
+ */
+public enum class SimResourceState {
+ /**
+ * The resource provider is pending and the resource is waiting to be consumed.
+ */
+ Pending,
+
+ /**
+ * The resource provider is active and the resource is currently being consumed.
+ */
+ Active,
+
+ /**
+ * The resource provider is stopped and the resource cannot be consumed anymore.
+ */
+ Stopped
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
index 060d0ea2..5eea78f6 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -22,33 +22,30 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.launch
import java.util.ArrayDeque
-import kotlin.coroutines.CoroutineContext
/**
* A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that
* a single output is directly connected to an input and that the switch can only support as much outputs as inputs.
*/
-public class SimResourceSwitchExclusive<R : SimResource>(context: CoroutineContext) : SimResourceSwitch<R> {
+public class SimResourceSwitchExclusive<R : SimResource> : SimResourceSwitch<R> {
/**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
+ * A flag to indicate that the switch is closed.
*/
- private val scope = CoroutineScope(context + Job())
+ private var isClosed: Boolean = false
- private val _outputs = mutableSetOf<SimResourceProvider<R>>()
+ private val _outputs = mutableSetOf<Provider>()
override val outputs: Set<SimResourceProvider<R>>
get() = _outputs
private val availableResources = ArrayDeque<SimResourceForwarder<R>>()
+
private val _inputs = mutableSetOf<SimResourceProvider<R>>()
override val inputs: Set<SimResourceProvider<R>>
get() = _inputs
override fun addOutput(resource: R): SimResourceProvider<R> {
+ check(!isClosed) { "Switch has been closed" }
check(availableResources.isNotEmpty()) { "No capacity to serve request" }
val forwarder = availableResources.poll()
val output = Provider(resource, forwarder)
@@ -57,33 +54,37 @@ public class SimResourceSwitchExclusive<R : SimResource>(context: CoroutineConte
}
override fun addInput(input: SimResourceProvider<R>) {
+ check(!isClosed) { "Switch has been closed" }
+
if (input in inputs) {
return
}
val forwarder = SimResourceForwarder(input.resource)
- scope.launch { input.consume(forwarder) }
-
_inputs += input
availableResources += forwarder
+
+ input.startConsumer(object : SimResourceConsumer<R> by forwarder {
+ override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) {
+ // De-register the input after it has finished
+ _inputs -= input
+ forwarder.onFinish(ctx, cause)
+ }
+ })
}
override fun close() {
- scope.cancel()
+ isClosed = true
+
+ // Cancel all upstream subscriptions
+ _inputs.forEach(SimResourceProvider<R>::cancel)
}
private inner class Provider(
override val resource: R,
private val forwarder: SimResourceForwarder<R>
- ) : SimResourceProvider<R> {
-
- override suspend fun consume(consumer: SimResourceConsumer<R>) = forwarder.consume(consumer)
-
- override fun interrupt() {
- forwarder.interrupt()
- }
-
+ ) : SimResourceProvider<R> by forwarder {
override fun close() {
_outputs -= this
availableResources += forwarder
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index bcf76d3c..6b919a77 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -25,10 +25,6 @@ package org.opendc.simulator.resources
import kotlinx.coroutines.*
import org.opendc.simulator.resources.consumer.SimConsumerBarrier
import java.time.Clock
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
@@ -39,14 +35,8 @@ import kotlin.math.min
*/
public class SimResourceSwitchMaxMin<R : SimResource>(
private val clock: Clock,
- context: CoroutineContext,
private val listener: Listener<R>? = null
) : SimResourceSwitch<R> {
- /**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
- */
- private val scope = CoroutineScope(context + Job())
-
private val inputConsumers = mutableSetOf<InputConsumer>()
private val _outputs = mutableSetOf<OutputProvider>()
override val outputs: Set<SimResourceProvider<R>>
@@ -112,9 +102,16 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
private var barrier: SimConsumerBarrier = SimConsumerBarrier(0)
/**
+ * A flag to indicate that the switch is closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
* Add an output to the switch represented by [resource].
*/
override fun addOutput(resource: R): SimResourceProvider<R> {
+ check(!isClosed) { "Switch has been closed" }
+
val provider = OutputProvider(resource)
_outputs.add(provider)
return provider
@@ -124,13 +121,15 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
* Add the specified [input] to the switch.
*/
override fun addInput(input: SimResourceProvider<R>) {
+ check(!isClosed) { "Switch has been closed" }
+
val consumer = InputConsumer(input)
_inputs.add(input)
inputConsumers += consumer
}
override fun close() {
- scope.cancel()
+ isClosed = true
}
/**
@@ -297,64 +296,56 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
*/
private inner class OutputProvider(override val resource: R) : SimResourceProvider<R> {
/**
- * A flag to indicate that the resource was closed.
- */
- private var isClosed: Boolean = false
-
- /**
- * The current active consumer.
- */
- private var cont: CancellableContinuation<Unit>? = null
-
- /**
* The [OutputContext] that is currently running.
*/
private var ctx: OutputContext? = null
- override suspend fun consume(consumer: SimResourceConsumer<R>) {
- check(!isClosed) { "Lifetime of resource has ended." }
- check(cont == null) { "Run should not be called concurrently" }
+ override var state: SimResourceState = SimResourceState.Pending
+ internal set
- try {
- return suspendCancellableCoroutine { cont ->
- val ctx = OutputContext(resource, consumer, cont)
- ctx.start()
- cont.invokeOnCancellation {
- ctx.stop()
- }
+ override fun startConsumer(consumer: SimResourceConsumer<R>) {
+ check(state == SimResourceState.Pending) { "Resource cannot be consumed" }
- this.cont = cont
- this.ctx = ctx
+ val ctx = OutputContext(this, resource, consumer)
+ this.ctx = ctx
+ this.state = SimResourceState.Active
+ outputContexts += ctx
- outputContexts += ctx
- schedule()
- }
- } finally {
- cont = null
- ctx = null
- }
+ ctx.start()
+ schedule()
}
override fun close() {
- isClosed = true
- cont?.cancel()
- cont = null
- ctx = null
+ cancel()
+
+ state = SimResourceState.Stopped
_outputs.remove(this)
}
override fun interrupt() {
ctx?.interrupt()
}
+
+ override fun cancel() {
+ val ctx = ctx
+ if (ctx != null) {
+ this.ctx = null
+ ctx.stop()
+ }
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
}
/**
* A [SimAbstractResourceContext] for the output resources.
*/
private inner class OutputContext(
+ private val provider: OutputProvider,
resource: R,
- consumer: SimResourceConsumer<R>,
- private val cont: Continuation<Unit>
+ consumer: SimResourceConsumer<R>
) : SimAbstractResourceContext<R>(resource, clock, consumer), Comparable<OutputContext> {
/**
* The current command that is processed by the vCPU.
@@ -371,11 +362,6 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
*/
var actualSpeed: Double = 0.0
- /**
- * A flag to indicate that the CPU has exited.
- */
- var hasExited: Boolean = false
-
override fun onIdle(deadline: Long) {
allowedSpeed = 0.0
activeCommand = SimResourceCommand.Idle(deadline)
@@ -386,16 +372,11 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
activeCommand = SimResourceCommand.Consume(work, limit, deadline)
}
- override fun onFinish() {
- hasExited = true
+ override fun onFinish(cause: Throwable?) {
activeCommand = SimResourceCommand.Exit
- cont.resume(Unit)
- }
+ provider.cancel()
- override fun onFailure(cause: Throwable) {
- hasExited = true
- activeCommand = SimResourceCommand.Exit
- cont.resumeWithException(cause)
+ super.onFinish(cause)
}
override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
@@ -453,21 +434,8 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
private lateinit var ctx: SimResourceContext<R>
init {
- scope.launch {
- try {
- barrier = SimConsumerBarrier(barrier.parties + 1)
- input.consume(this@InputConsumer)
- } catch (e: CancellationException) {
- // Cancel gracefully
- throw e
- } catch (e: Throwable) {
- e.printStackTrace()
- } finally {
- barrier = SimConsumerBarrier(barrier.parties - 1)
- inputConsumers -= this@InputConsumer
- _inputs -= input
- }
- }
+ barrier = SimConsumerBarrier(barrier.parties + 1)
+ input.startConsumer(this@InputConsumer)
}
/**
@@ -477,12 +445,11 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
ctx.interrupt()
}
- override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand {
+ override fun onStart(ctx: SimResourceContext<R>) {
this.ctx = ctx
- return commands[ctx.resource] ?: SimResourceCommand.Idle()
}
- override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand {
totalRemainingWork += remainingWork
val isLast = barrier.enter()
@@ -504,5 +471,13 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
commands[ctx.resource] ?: SimResourceCommand.Idle()
}
}
+
+ override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) {
+ barrier = SimConsumerBarrier(barrier.parties - 1)
+ inputConsumers -= this@InputConsumer
+ _inputs -= input
+
+ super.onFinish(ctx, cause)
+ }
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
index 03a3cebd..a00ee575 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -31,14 +31,16 @@ import org.opendc.simulator.resources.SimResourceContext
* A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource
* consumption for some period of time.
*/
-public class SimTraceConsumer(trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> {
- private val iterator = trace.iterator()
+public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> {
+ private var iterator: Iterator<Fragment>? = null
- override fun onStart(ctx: SimResourceContext<SimResource>): SimResourceCommand {
- return onNext(ctx, 0.0)
+ override fun onStart(ctx: SimResourceContext<SimResource>) {
+ check(iterator == null) { "Consumer already running" }
+ iterator = trace.iterator()
}
- override fun onNext(ctx: SimResourceContext<SimResource>, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext<SimResource>, capacity: Double, remainingWork: Double): SimResourceCommand {
+ val iterator = checkNotNull(iterator)
return if (iterator.hasNext()) {
val now = ctx.clock.millis()
val fragment = iterator.next()
@@ -56,6 +58,10 @@ public class SimTraceConsumer(trace: Sequence<Fragment>) : SimResourceConsumer<S
}
}
+ override fun onFinish(ctx: SimResourceContext<SimResource>, cause: Throwable?) {
+ iterator = null
+ }
+
/**
* A fragment of the workload.
*/
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
new file mode 100644
index 00000000..bad2f403
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+import org.opendc.simulator.resources.SimResource
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+
+/**
+ * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization.
+ */
+public class SimWorkConsumer<R : SimResource>(
+ private val work: Double,
+ private val utilization: Double
+) : SimResourceConsumer<R> {
+
+ init {
+ require(work >= 0.0) { "Work must be positive" }
+ require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
+ }
+
+ private var limit = 0.0
+ private var remainingWork: Double = 0.0
+
+ override fun onStart(ctx: SimResourceContext<R>) {
+ limit = ctx.resource.capacity * utilization
+ remainingWork = work
+ }
+
+ override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand {
+ val work = this.remainingWork + remainingWork
+ this.remainingWork -= work
+ return if (work > 0.0) {
+ SimResourceCommand.Consume(work, limit)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index e7642dc1..0bc87473 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -22,11 +22,10 @@
package org.opendc.simulator.resources
+import io.mockk.*
import kotlinx.coroutines.*
import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.*
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertTrue
import org.opendc.simulator.utils.DelayControllerClockAdapter
/**
@@ -45,28 +44,15 @@ class SimResourceContextTest {
val resource = SimCpu(4200.0)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(10.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
- override fun onIdle(deadline: Long) {
- }
+ override fun onIdle(deadline: Long) {}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- }
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
- override fun onFinish() {
- }
-
- override fun onFailure(cause: Throwable) {
- }
+ override fun onFinish(cause: Throwable?) {}
}
context.flush()
@@ -77,36 +63,20 @@ class SimResourceContextTest {
val clock = DelayControllerClockAdapter(this)
val resource = SimCpu(4200.0)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(10.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
-
- var counter = 0
- val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
- override fun onIdle(deadline: Long) {
- }
-
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- counter++
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- override fun onFinish() {
- }
-
- override fun onFailure(cause: Throwable) {
- }
- }
+ val context = spyk(object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ })
context.start()
delay(1) // Delay 1 ms to prevent hitting the fast path
context.flush(isIntermediate = true)
- assertEquals(2, counter)
+
+ verify(exactly = 2) { context.onConsume(any(), any(), any()) }
}
@Test
@@ -114,33 +84,14 @@ class SimResourceContextTest {
val clock = DelayControllerClockAdapter(this)
val resource = SimCpu(4200.0)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Idle(10)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
-
- var counter = 0
- var isFinished = false
- val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
- override fun onIdle(deadline: Long) {
- counter++
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- }
-
- override fun onFinish() {
- isFinished = true
- }
-
- override fun onFailure(cause: Throwable) {
- }
- }
+ val context = spyk(object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ })
context.start()
delay(5)
@@ -149,8 +100,26 @@ class SimResourceContextTest {
context.flush(isIntermediate = true)
assertAll(
- { assertEquals(1, counter) },
- { assertTrue(isFinished) }
+ { verify(exactly = 1) { context.onIdle(any()) } },
+ { verify(exactly = 1) { context.onFinish(null) } }
)
}
+
+ @Test
+ fun testDoubleStart() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val resource = SimCpu(4200.0)
+
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
+
+ val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ }
+
+ context.start()
+ assertThrows<IllegalStateException> { context.start() }
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
index ced1bd98..b1b959ba 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
@@ -22,10 +22,16 @@
package org.opendc.simulator.resources
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.verify
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runBlockingTest
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
@@ -53,14 +59,15 @@ internal class SimResourceForwarderTest {
}
forwarder.consume(object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Exit
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ override fun onNext(
+ ctx: SimResourceContext<SimCpu>,
+ capacity: Double,
+ remainingWork: Double
+ ): SimResourceCommand {
return SimResourceCommand.Exit
}
})
+
forwarder.close()
scheduler.close()
}
@@ -78,15 +85,100 @@ internal class SimResourceForwarderTest {
}
forwarder.consume(object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
+ var isFirst = true
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
+ override fun onNext(
+ ctx: SimResourceContext<SimCpu>,
+ capacity: Double,
+ remainingWork: Double
+ ): SimResourceCommand {
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(10.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
}
})
forwarder.close()
}
+
+ @Test
+ fun testState() = runBlockingTest {
+ val forwarder = SimResourceForwarder(SimCpu(1000.0))
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onNext(
+ ctx: SimResourceContext<SimCpu>,
+ capacity: Double,
+ remainingWork: Double
+ ): SimResourceCommand = SimResourceCommand.Exit
+ }
+
+ assertEquals(SimResourceState.Pending, forwarder.state)
+
+ forwarder.startConsumer(consumer)
+ assertEquals(SimResourceState.Active, forwarder.state)
+
+ assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) }
+
+ forwarder.cancel()
+ assertEquals(SimResourceState.Pending, forwarder.state)
+
+ forwarder.close()
+ assertEquals(SimResourceState.Stopped, forwarder.state)
+ }
+
+ @Test
+ fun testCancelPendingDelegate() = runBlockingTest {
+ val forwarder = SimResourceForwarder(SimCpu(1000.0))
+
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit
+
+ forwarder.startConsumer(consumer)
+ forwarder.cancel()
+
+ verify(exactly = 0) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testCancelStartedDelegate() = runBlockingTest {
+ val forwarder = SimResourceForwarder(SimCpu(1000.0))
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(SimCpu(2000.0), clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10)
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ forwarder.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any()) }
+ verify(exactly = 1) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testCancelPropagation() = runBlockingTest {
+ val forwarder = SimResourceForwarder(SimCpu(1000.0))
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(SimCpu(2000.0), clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Idle(10)
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ source.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any()) }
+ verify(exactly = 1) { consumer.onFinish(any(), null) }
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 4f7825fc..18f18ded 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import io.mockk.every
+import io.mockk.mockk
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.runBlockingTest
@@ -46,15 +48,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1000 * ctx.resource.speed, ctx.resource.speed)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1000 * provider.resource.speed, provider.resource.speed))
+ .andThen(SimResourceCommand.Exit)
try {
val res = mutableListOf<Double>()
@@ -76,15 +73,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1000 * ctx.resource.speed, 2 * ctx.resource.speed)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1000 * provider.resource.speed, 2 * provider.resource.speed))
+ .andThen(SimResourceCommand.Exit)
try {
val res = mutableListOf<Double>()
@@ -111,13 +103,12 @@ class SimResourceSourceTest {
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ override fun onStart(ctx: SimResourceContext<SimCpu>) {
ctx.interrupt()
- return SimResourceCommand.Exit
}
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
+ override fun onNext(ctx: SimResourceContext<SimCpu>, capacity: Double, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
}
}
@@ -137,14 +128,19 @@ class SimResourceSourceTest {
lateinit var resCtx: SimResourceContext<SimCpu>
val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ var isFirst = true
+ override fun onStart(ctx: SimResourceContext<SimCpu>) {
resCtx = ctx
- return SimResourceCommand.Consume(4.0, 1.0)
}
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext<SimCpu>, capacity: Double, remainingWork: Double): SimResourceCommand {
assertEquals(0.0, remainingWork)
- return SimResourceCommand.Exit
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(4.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
}
}
@@ -168,15 +164,9 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- throw IllegalStateException()
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onStart(any()) }
+ .throws(IllegalStateException())
try {
assertThrows<IllegalStateException> {
@@ -194,15 +184,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
try {
assertThrows<IllegalStateException> {
@@ -220,15 +205,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
try {
assertThrows<IllegalStateException> {
@@ -249,15 +229,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
try {
assertThrows<IllegalStateException> {
@@ -276,15 +251,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
try {
launch { provider.consume(consumer) }
@@ -304,15 +274,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Idle(ctx.clock.millis() + 500)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Idle(clock.millis() + 500))
+ .andThen(SimResourceCommand.Exit)
try {
provider.consume(consumer)
@@ -332,15 +297,10 @@ class SimResourceSourceTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Idle()
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Idle())
+ .andThenThrows(IllegalStateException())
try {
provider.consume(consumer)
@@ -351,4 +311,25 @@ class SimResourceSourceTest {
}
}
}
+
+ @Test
+ fun testIncorrectDeadline() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Idle(2))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ delay(10)
+
+ assertThrows<IllegalArgumentException> { provider.consume(consumer) }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index ca6558bf..354dab93 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import io.mockk.every
+import io.mockk.mockk
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
@@ -34,7 +36,6 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.resources.consumer.SimTraceConsumer
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
-import java.lang.IllegalStateException
/**
* Test suite for the [SimResourceSwitchExclusive] class.
@@ -67,7 +68,7 @@ internal class SimResourceSwitchExclusiveTest {
),
)
- val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val switch = SimResourceSwitchExclusive<SimCpu>()
val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
switch.addInput(source)
@@ -98,17 +99,10 @@ internal class SimResourceSwitchExclusiveTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val duration = 5 * 60L * 1000
- val workload = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(duration / 1000.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val workload = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
- val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val switch = SimResourceSwitchExclusive<SimCpu>()
val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
switch.addInput(source)
@@ -134,16 +128,27 @@ internal class SimResourceSwitchExclusiveTest {
val duration = 5 * 60L * 1000
val workload = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(duration / 1000.0, 1.0)
+ var isFirst = true
+
+ override fun onStart(ctx: SimResourceContext<SimCpu>) {
+ isFirst = true
}
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
+ override fun onNext(
+ ctx: SimResourceContext<SimCpu>,
+ capacity: Double,
+ remainingWork: Double
+ ): SimResourceCommand {
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(duration / 1000.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
}
}
- val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val switch = SimResourceSwitchExclusive<SimCpu>()
val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
switch.addInput(source)
@@ -169,17 +174,10 @@ internal class SimResourceSwitchExclusiveTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val duration = 5 * 60L * 1000
- val workload = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(duration.toDouble(), 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val workload = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
- val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val switch = SimResourceSwitchExclusive<SimCpu>()
val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
switch.addInput(source)
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
index 698c1700..e8f5a13c 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import io.mockk.every
+import io.mockk.mockk
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
@@ -47,22 +49,15 @@ internal class SimResourceSwitchMaxMinTest {
fun testSmoke() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val switch = SimResourceSwitchMaxMin<SimCpu>(clock, coroutineContext)
+ val switch = SimResourceSwitchMaxMin<SimCpu>(clock)
val sources = List(2) { SimResourceSource(SimCpu(2000.0), clock, scheduler) }
sources.forEach { switch.addInput(it) }
val provider = switch.addOutput(SimCpu(1000.0))
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit
try {
provider.consume(consumer)
@@ -112,7 +107,7 @@ internal class SimResourceSwitchMaxMinTest {
),
)
- val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener)
+ val switch = SimResourceSwitchMaxMin(clock, listener)
val provider = switch.addOutput(SimCpu(3200.0))
try {
@@ -180,7 +175,7 @@ internal class SimResourceSwitchMaxMinTest {
)
)
- val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener)
+ val switch = SimResourceSwitchMaxMin(clock, listener)
val providerA = switch.addOutput(SimCpu(3200.0))
val providerB = switch.addOutput(SimCpu(3200.0))
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
new file mode 100644
index 00000000..b05195f7
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimWorkConsumer] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimWorkConsumerTest {
+ data class SimCpu(val speed: Double) : SimResource {
+ override val capacity: Double
+ get() = speed
+ }
+
+ @Test
+ fun testSmoke() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(1.0), clock, scheduler)
+
+ val consumer = SimWorkConsumer<SimCpu>(1.0, 1.0)
+
+ try {
+ provider.consume(consumer)
+ assertEquals(1000, currentTime)
+ } finally {
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testUtilization() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(1.0), clock, scheduler)
+
+ val consumer = SimWorkConsumer<SimCpu>(1.0, 0.5)
+
+ try {
+ provider.consume(consumer)
+ assertEquals(2000, currentTime)
+ } finally {
+ provider.close()
+ }
+ }
+}
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
index 49964938..d4bc7b5c 100644
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
+++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
@@ -93,7 +93,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
try {
timer()
} catch (e: Throwable) {
- Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e)
+ coroutineContext[CoroutineExceptionHandler]?.handleException(coroutineContext, e)
}
}
}