summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-resources/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources/src')
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt43
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt135
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt208
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt344
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt48
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt63
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt52
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt79
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt51
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt43
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt420
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlow.kt29
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt79
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt129
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt43
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt48
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt95
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt119
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt175
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt52
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt81
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt68
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt58
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt202
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt74
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt104
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt351
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt173
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt193
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt210
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt66
31 files changed, 3835 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt
new file mode 100644
index 00000000..8d2587b1
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
+
+/**
+ * Helper function to create simple consumer workload.
+ */
+fun createSimpleConsumer(): SimResourceConsumer {
+ return SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(1000, 28.0),
+ SimTraceConsumer.Fragment(1000, 3500.0),
+ SimTraceConsumer.Fragment(1000, 0.0),
+ SimTraceConsumer.Fragment(1000, 183.0),
+ SimTraceConsumer.Fragment(1000, 400.0),
+ SimTraceConsumer.Fragment(1000, 100.0),
+ SimTraceConsumer.Fragment(1000, 3000.0),
+ SimTraceConsumer.Fragment(1000, 4500.0),
+ ),
+ )
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
new file mode 100644
index 00000000..beda3eaa
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
@@ -0,0 +1,135 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.launch
+import org.opendc.simulator.core.SimulationCoroutineScope
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.utils.TimerScheduler
+import org.openjdk.jmh.annotations.*
+import java.util.concurrent.TimeUnit
+
+@State(Scope.Thread)
+@Fork(1)
+@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
+@OptIn(ExperimentalCoroutinesApi::class)
+class SimResourceBenchmarks {
+ private lateinit var scope: SimulationCoroutineScope
+ private lateinit var scheduler: TimerScheduler<Any>
+
+ @Setup
+ fun setUp() {
+ scope = SimulationCoroutineScope()
+ scheduler = TimerScheduler(scope.coroutineContext, scope.clock)
+ }
+
+ @State(Scope.Thread)
+ class Workload {
+ lateinit var consumers: Array<SimResourceConsumer>
+
+ @Setup
+ fun setUp() {
+ consumers = Array(3) { createSimpleConsumer() }
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSource(state: Workload) {
+ return scope.runBlockingSimulation {
+ val provider = SimResourceSource(4200.0, clock, scheduler)
+ return@runBlockingSimulation provider.consume(state.consumers[0])
+ }
+ }
+
+ @Benchmark
+ fun benchmarkForwardOverhead(state: Workload) {
+ return scope.runBlockingSimulation {
+ val provider = SimResourceSource(4200.0, clock, scheduler)
+ val forwarder = SimResourceForwarder()
+ provider.startConsumer(forwarder)
+ return@runBlockingSimulation forwarder.consume(state.consumers[0])
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) {
+ return scope.runBlockingSimulation {
+ val switch = SimResourceSwitchMaxMin(clock)
+
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+
+ val provider = switch.addOutput(3500.0)
+ return@runBlockingSimulation provider.consume(state.consumers[0])
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) {
+ return scope.runBlockingSimulation {
+ val switch = SimResourceSwitchMaxMin(clock)
+
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+
+ repeat(3) { i ->
+ launch {
+ val provider = switch.addOutput(3500.0)
+ provider.consume(state.consumers[i])
+ }
+ }
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSwitchExclusiveSingleConsumer(state: Workload) {
+ return scope.runBlockingSimulation {
+ val switch = SimResourceSwitchExclusive()
+
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+
+ val provider = switch.addOutput(3500.0)
+ return@runBlockingSimulation provider.consume(state.consumers[0])
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSwitchExclusiveTripleConsumer(state: Workload) {
+ return scope.runBlockingSimulation {
+ val switch = SimResourceSwitchExclusive()
+
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+
+ repeat(2) { i ->
+ launch {
+ val provider = switch.addOutput(3500.0)
+ provider.consume(state.consumers[i])
+ }
+ }
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
new file mode 100644
index 00000000..c7fa6a17
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * Abstract implementation of [SimResourceAggregator].
+ */
+public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator {
+ /**
+ * The available resource provider contexts.
+ */
+ protected val inputContexts: Set<SimResourceContext>
+ get() = _inputContexts
+ private val _inputContexts = mutableSetOf<SimResourceContext>()
+
+ /**
+ * The output context.
+ */
+ protected val outputContext: SimResourceContext
+ get() = context
+
+ /**
+ * The commands to submit to the underlying input resources.
+ */
+ protected val commands: MutableMap<SimResourceContext, SimResourceCommand> = mutableMapOf()
+
+ /**
+ * This method is invoked when the resource consumer consumes resources.
+ */
+ protected abstract fun doConsume(work: Double, limit: Double, deadline: Long)
+
+ /**
+ * This method is invoked when the resource consumer enters an idle state.
+ */
+ protected open fun doIdle(deadline: Long) {
+ for (input in inputContexts) {
+ commands[input] = SimResourceCommand.Idle(deadline)
+ }
+ }
+
+ /**
+ * This method is invoked when the resource consumer finishes processing.
+ */
+ protected open fun doFinish(cause: Throwable?) {
+ for (input in inputContexts) {
+ commands[input] = SimResourceCommand.Exit
+ }
+ }
+
+ /**
+ * This method is invoked when an input context is started.
+ */
+ protected open fun onContextStarted(ctx: SimResourceContext) {
+ _inputContexts.add(ctx)
+ }
+
+ protected open fun onContextFinished(ctx: SimResourceContext) {
+ assert(_inputContexts.remove(ctx)) { "Lost context" }
+ }
+
+ override fun addInput(input: SimResourceProvider) {
+ check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" }
+
+ val consumer = Consumer()
+ _inputs.add(input)
+ input.startConsumer(consumer)
+ }
+
+ override fun close() {
+ output.close()
+ }
+
+ override val output: SimResourceProvider
+ get() = _output
+ private val _output = SimResourceForwarder()
+
+ override val inputs: Set<SimResourceProvider>
+ get() = _inputs
+ private val _inputs = mutableSetOf<SimResourceProvider>()
+
+ private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) {
+ override val remainingWork: Double
+ get() {
+ val now = clock.millis()
+
+ return if (_remainingWorkFlush < now) {
+ _remainingWorkFlush = now
+ _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it }
+ } else {
+ _remainingWork
+ }
+ }
+ private var _remainingWork: Double = 0.0
+ private var _remainingWorkFlush: Long = Long.MIN_VALUE
+
+ override fun interrupt() {
+ super.interrupt()
+
+ interruptAll()
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline)
+
+ override fun onIdle(deadline: Long) = doIdle(deadline)
+
+ override fun onFinish(cause: Throwable?) {
+ doFinish(cause)
+
+ super.onFinish(cause)
+
+ interruptAll()
+ }
+ }
+
+ /**
+ * A flag to indicate that an interrupt is active.
+ */
+ private var isInterrupting: Boolean = false
+
+ /**
+ * Schedule the work over the input resources.
+ */
+ private fun doSchedule() {
+ context.flush(isIntermediate = true)
+ interruptAll()
+ }
+
+ /**
+ * Interrupt all inputs.
+ */
+ private fun interruptAll() {
+ // Prevent users from interrupting the resource while they are constructing their next command, as this will
+ // only lead to infinite recursion.
+ if (isInterrupting) {
+ return
+ }
+
+ try {
+ isInterrupting = true
+
+ val iterator = _inputs.iterator()
+ while (iterator.hasNext()) {
+ val input = iterator.next()
+ input.interrupt()
+
+ if (input.state != SimResourceState.Active) {
+ iterator.remove()
+ }
+ }
+ } finally {
+ isInterrupting = false
+ }
+ }
+
+ /**
+ * An internal [SimResourceConsumer] implementation for aggregator inputs.
+ */
+ private inner class Consumer : SimResourceConsumer {
+ override fun onStart(ctx: SimResourceContext) {
+ onContextStarted(ctx)
+ onCapacityChanged(ctx, false)
+
+ // Make sure we initialize the output if we have not done so yet
+ if (context.state == SimResourceState.Pending) {
+ context.start()
+ }
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ doSchedule()
+
+ return commands[ctx] ?: SimResourceCommand.Idle()
+ }
+
+ override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
+ // Adjust capacity of output resource
+ context.capacity = inputContexts.sumByDouble { it.capacity }
+ }
+
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ onContextFinished(ctx)
+
+ super.onFinish(ctx, cause)
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
new file mode 100644
index 00000000..05ed0714
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -0,0 +1,344 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
+ */
+public abstract class SimAbstractResourceContext(
+ initialCapacity: Double,
+ override val clock: Clock,
+ private val consumer: SimResourceConsumer
+) : SimResourceContext {
+ /**
+ * The capacity of the resource.
+ */
+ public final override var capacity: Double = initialCapacity
+ set(value) {
+ val oldValue = field
+
+ // Only changes will be propagated
+ if (value != oldValue) {
+ field = value
+ onCapacityChange()
+ }
+ }
+
+ /**
+ * The amount of work still remaining at this instant.
+ */
+ override val remainingWork: Double
+ get() {
+ val activeCommand = activeCommand ?: return 0.0
+ val now = clock.millis()
+
+ return if (_remainingWorkFlush < now) {
+ _remainingWorkFlush = now
+ computeRemainingWork(activeCommand, now).also { _remainingWork = it }
+ } else {
+ _remainingWork
+ }
+ }
+ private var _remainingWork: Double = 0.0
+ private var _remainingWorkFlush: Long = Long.MIN_VALUE
+
+ /**
+ * A flag to indicate the state of the context.
+ */
+ public var state: SimResourceState = SimResourceState.Pending
+ private set
+
+ /**
+ * The current processing speed of the resource.
+ */
+ public var speed: Double = 0.0
+ private set
+
+ /**
+ * This method is invoked when the resource will idle until the specified [deadline].
+ */
+ public abstract fun onIdle(deadline: Long)
+
+ /**
+ * This method is invoked when the resource will be consumed until the specified [work] was processed or the
+ * [deadline] was reached.
+ */
+ public abstract fun onConsume(work: Double, limit: Double, deadline: Long)
+
+ /**
+ * This method is invoked when the resource consumer has finished.
+ */
+ public open fun onFinish(cause: Throwable?) {
+ consumer.onFinish(this, cause)
+ }
+
+ /**
+ * Get the remaining work to process after a resource consumption.
+ *
+ * @param work The size of the resource consumption.
+ * @param speed The speed of consumption.
+ * @param duration The duration from the start of the consumption until now.
+ * @return The amount of work remaining.
+ */
+ protected open fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
+ return if (duration > 0L) {
+ val processed = duration / 1000.0 * speed
+ max(0.0, work - processed)
+ } else {
+ 0.0
+ }
+ }
+
+ /**
+ * Start the consumer.
+ */
+ public fun start() {
+ check(state == SimResourceState.Pending) { "Consumer is already started" }
+
+ val now = clock.millis()
+
+ state = SimResourceState.Active
+ isProcessing = true
+ latestFlush = now
+
+ try {
+ consumer.onStart(this)
+ activeCommand = interpret(consumer.onNext(this), now)
+ } catch (cause: Throwable) {
+ doStop(cause)
+ } finally {
+ isProcessing = false
+ }
+ }
+
+ /**
+ * Immediately stop the consumer.
+ */
+ public fun stop() {
+ try {
+ isProcessing = true
+ latestFlush = clock.millis()
+
+ flush(isIntermediate = true)
+ doStop(null)
+ } catch (cause: Throwable) {
+ doStop(cause)
+ } finally {
+ isProcessing = false
+ }
+ }
+
+ /**
+ * Flush the current active resource consumption.
+ *
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public fun flush(isIntermediate: Boolean = false) {
+ // Flush is no-op when the consumer is finished or not yet started
+ if (state != SimResourceState.Active) {
+ return
+ }
+
+ val now = clock.millis()
+
+ // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it.
+ if (isIntermediate && latestFlush >= now) {
+ return
+ }
+
+ try {
+ val activeCommand = activeCommand ?: return
+ val (timestamp, command) = activeCommand
+
+ // Note: accessor is reliant on activeCommand being set
+ val remainingWork = remainingWork
+
+ isProcessing = true
+
+ val duration = now - timestamp
+ assert(duration >= 0) { "Flush in the past" }
+
+ this.activeCommand = when (command) {
+ is SimResourceCommand.Idle -> {
+ // We should only continue processing the next command if:
+ // 1. The resource consumer reached its deadline.
+ // 2. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ if (command.deadline <= now || !isIntermediate) {
+ next(now)
+ } else {
+ interpret(SimResourceCommand.Idle(command.deadline), now)
+ }
+ }
+ is SimResourceCommand.Consume -> {
+ // We should only continue processing the next command if:
+ // 1. The resource consumption was finished.
+ // 2. The resource capacity cannot satisfy the demand.
+ // 4. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) {
+ next(now)
+ } else {
+ interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline), now)
+ }
+ }
+ SimResourceCommand.Exit ->
+ // Flush may not be called when the resource consumer has finished
+ throw IllegalStateException()
+ }
+
+ // Flush remaining work cache
+ _remainingWorkFlush = Long.MIN_VALUE
+ } catch (cause: Throwable) {
+ doStop(cause)
+ } finally {
+ latestFlush = now
+ isProcessing = false
+ }
+ }
+
+ override fun interrupt() {
+ // Prevent users from interrupting the resource while they are constructing their next command, as this will
+ // only lead to infinite recursion.
+ if (isProcessing) {
+ return
+ }
+
+ flush()
+ }
+
+ override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]"
+
+ /**
+ * A flag to indicate that the resource is currently processing a command.
+ */
+ protected var isProcessing: Boolean = false
+
+ /**
+ * The current command that is being processed.
+ */
+ private var activeCommand: CommandWrapper? = null
+
+ /**
+ * The latest timestamp at which the resource was flushed.
+ */
+ private var latestFlush: Long = Long.MIN_VALUE
+
+ /**
+ * Finish the consumer and resource provider.
+ */
+ private fun doStop(cause: Throwable?) {
+ val state = state
+ this.state = SimResourceState.Stopped
+
+ if (state == SimResourceState.Active) {
+ activeCommand = null
+ onFinish(cause)
+ }
+ }
+
+ /**
+ * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
+ */
+ private fun interpret(command: SimResourceCommand, now: Long): CommandWrapper? {
+ when (command) {
+ is SimResourceCommand.Idle -> {
+ val deadline = command.deadline
+
+ require(deadline >= now) { "Deadline already passed" }
+
+ speed = 0.0
+ consumer.onConfirm(this, 0.0)
+
+ onIdle(deadline)
+ }
+ is SimResourceCommand.Consume -> {
+ val work = command.work
+ val limit = command.limit
+ val deadline = command.deadline
+
+ require(deadline >= now) { "Deadline already passed" }
+
+ speed = min(capacity, limit)
+ consumer.onConfirm(this, speed)
+
+ onConsume(work, limit, deadline)
+ }
+ is SimResourceCommand.Exit -> {
+ speed = 0.0
+
+ doStop(null)
+
+ // No need to set the next active command
+ return null
+ }
+ }
+
+ return CommandWrapper(now, command)
+ }
+
+ /**
+ * Request the workload for more work.
+ */
+ private fun next(now: Long): CommandWrapper? = interpret(consumer.onNext(this), now)
+
+ /**
+ * Compute the remaining work based on the specified [wrapper] and [timestamp][now].
+ */
+ private fun computeRemainingWork(wrapper: CommandWrapper, now: Long): Double {
+ val (timestamp, command) = wrapper
+ val duration = now - timestamp
+ return when (command) {
+ is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration)
+ is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0
+ }
+ }
+
+ /**
+ * Indicate that the capacity of the resource has changed.
+ */
+ private fun onCapacityChange() {
+ // Do not inform the consumer if it has not been started yet
+ if (state != SimResourceState.Active) {
+ return
+ }
+
+ val isThrottled = speed > capacity
+ consumer.onCapacityChanged(this, isThrottled)
+
+ // Optimization: only flush changes if the new capacity cannot satisfy the active resource command.
+ // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush().
+ if (isThrottled) {
+ flush(isIntermediate = true)
+ }
+ }
+
+ /**
+ * This class wraps a [command] with the timestamp it was started and possibly the task associated with it.
+ */
+ private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand)
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
new file mode 100644
index 00000000..bb4e6a2c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource.
+ */
+public interface SimResourceAggregator : AutoCloseable {
+ /**
+ * The output resource provider to which resource consumers can be attached.
+ */
+ public val output: SimResourceProvider
+
+ /**
+ * The input resources that will be switched between the output providers.
+ */
+ public val inputs: Set<SimResourceProvider>
+
+ /**
+ * Add the specified [input] to the switch.
+ */
+ public fun addInput(input: SimResourceProvider)
+
+ /**
+ * End the lifecycle of the aggregator.
+ */
+ public override fun close()
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
new file mode 100644
index 00000000..08bc064e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * A [SimResourceAggregator] that distributes the load equally across the input resources.
+ */
+public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) {
+ private val consumers = mutableListOf<SimResourceContext>()
+
+ override fun doConsume(work: Double, limit: Double, deadline: Long) {
+ // Sort all consumers by their capacity
+ consumers.sortWith(compareBy { it.capacity })
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (input in consumers) {
+ val inputCapacity = input.capacity
+ val fraction = inputCapacity / outputContext.capacity
+ val grantedSpeed = limit * fraction
+ val grantedWork = fraction * work
+
+ commands[input] =
+ if (grantedWork > 0.0 && grantedSpeed > 0.0)
+ SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+ }
+ }
+
+ override fun onContextStarted(ctx: SimResourceContext) {
+ super.onContextStarted(ctx)
+
+ consumers.add(ctx)
+ }
+
+ override fun onContextFinished(ctx: SimResourceContext) {
+ super.onContextFinished(ctx)
+
+ consumers.remove(ctx)
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
new file mode 100644
index 00000000..f7f3fa4d
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer].
+ */
+public sealed class SimResourceCommand {
+ /**
+ * A request to the resource to perform the specified amount of work before the given [deadline].
+ *
+ * @param work The amount of work to process.
+ * @param limit The maximum amount of work to be processed per second.
+ * @param deadline The instant at which the work needs to be fulfilled.
+ */
+ public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() {
+ init {
+ require(work > 0) { "Amount of work must be positive" }
+ require(limit > 0) { "Limit must be positive" }
+ }
+ }
+
+ /**
+ * An indication to the resource that the consumer will idle until the specified [deadline] or if it is interrupted.
+ */
+ public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand()
+
+ /**
+ * An indication to the resource that the consumer has finished.
+ */
+ public object Exit : SimResourceCommand()
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
new file mode 100644
index 00000000..38672b13
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceConsumer] characterizes how a resource is consumed.
+ *
+ * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently)
+ * for multiple resource providers, unless explicitly said otherwise.
+ */
+public interface SimResourceConsumer {
+ /**
+ * This method is invoked when the consumer is started for some resource.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ */
+ public fun onStart(ctx: SimResourceContext) {}
+
+ /**
+ * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because
+ * the resource finished processing, reached its deadline or was interrupted.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @return The next command that the resource should execute.
+ */
+ public fun onNext(ctx: SimResourceContext): SimResourceCommand
+
+ /**
+ * This method is invoked when the resource provider confirms that the consumer is running at the given speed.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @param speed The speed at which the consumer runs.
+ */
+ public fun onConfirm(ctx: SimResourceContext, speed: Double) {}
+
+ /**
+ * This is method is invoked when the capacity of the resource changes.
+ *
+ * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the
+ * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly
+ * causing the active resource command to finish at a later moment than initially planned.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the
+ * capacity change.
+ */
+ public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {}
+
+ /**
+ * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit],
+ * the resource finished itself, or a failure occurred at the resource.
+ *
+ * Note that throwing an exception in [onStart] or [onNext] is undefined behavior and up to the resource provider.
+ *
+ * @param ctx The execution context in which the consumer ran.
+ * @param cause The cause of the finish in case the resource finished exceptionally.
+ */
+ public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {}
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
new file mode 100644
index 00000000..11dbb09f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a
+ * resource and a resource consumer.
+ */
+public interface SimResourceContext {
+ /**
+ * The virtual clock tracking simulation time.
+ */
+ public val clock: Clock
+
+ /**
+ * The resource capacity available at this instant.
+ */
+ public val capacity: Double
+
+ /**
+ * The amount of work still remaining at this instant.
+ */
+ public val remainingWork: Double
+
+ /**
+ * Ask the resource provider to interrupt its resource.
+ */
+ public fun interrupt()
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
new file mode 100644
index 00000000..b2759b7f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers.
+ */
+public interface SimResourceDistributor : AutoCloseable {
+ /**
+ * The output resource providers to which resource consumers can be attached.
+ */
+ public val outputs: Set<SimResourceProvider>
+
+ /**
+ * The input resource that will be distributed over the consumers.
+ */
+ public val input: SimResourceProvider
+
+ /**
+ * Add an output to the switch with the specified [capacity].
+ */
+ public fun addOutput(capacity: Double): SimResourceProvider
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
new file mode 100644
index 00000000..dfdd2c2e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -0,0 +1,420 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing.
+ */
+public class SimResourceDistributorMaxMin(
+ override val input: SimResourceProvider,
+ private val clock: Clock,
+ private val listener: Listener? = null
+) : SimResourceDistributor {
+ override val outputs: Set<SimResourceProvider>
+ get() = _outputs
+ private val _outputs = mutableSetOf<OutputProvider>()
+
+ /**
+ * The active output contexts.
+ */
+ private val outputContexts: MutableList<OutputContext> = mutableListOf()
+
+ /**
+ * The total speed requested by the output resources.
+ */
+ private var totalRequestedSpeed = 0.0
+
+ /**
+ * The total amount of work requested by the output resources.
+ */
+ private var totalRequestedWork = 0.0
+
+ /**
+ * The total allocated speed for the output resources.
+ */
+ private var totalAllocatedSpeed = 0.0
+
+ /**
+ * The total allocated work requested for the output resources.
+ */
+ private var totalAllocatedWork = 0.0
+
+ /**
+ * The amount of work that could not be performed due to over-committing resources.
+ */
+ private var totalOvercommittedWork = 0.0
+
+ /**
+ * The amount of work that was lost due to interference.
+ */
+ private var totalInterferedWork = 0.0
+
+ /**
+ * A flag to indicate that the switch is closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
+ * An internal [SimResourceConsumer] implementation for switch inputs.
+ */
+ private val consumer = object : SimResourceConsumer {
+ /**
+ * The resource context of the consumer.
+ */
+ private lateinit var ctx: SimResourceContext
+
+ val remainingWork: Double
+ get() = ctx.remainingWork
+
+ override fun onStart(ctx: SimResourceContext) {
+ this.ctx = ctx
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return doNext(ctx.capacity)
+ }
+
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ super.onFinish(ctx, cause)
+
+ val iterator = _outputs.iterator()
+ while (iterator.hasNext()) {
+ val output = iterator.next()
+
+ // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
+ // during the call to output.close()
+ iterator.remove()
+
+ output.close()
+ }
+ }
+ }
+
+ /**
+ * The total amount of remaining work.
+ */
+ private val totalRemainingWork: Double
+ get() = consumer.remainingWork
+
+ override fun addOutput(capacity: Double): SimResourceProvider {
+ check(!isClosed) { "Distributor has been closed" }
+
+ val provider = OutputProvider(capacity)
+ _outputs.add(provider)
+ return provider
+ }
+
+ override fun close() {
+ if (!isClosed) {
+ isClosed = true
+ input.cancel()
+ }
+ }
+
+ init {
+ input.startConsumer(consumer)
+ }
+
+ /**
+ * Indicate that the workloads should be re-scheduled.
+ */
+ private fun schedule() {
+ input.interrupt()
+ }
+
+ /**
+ * Schedule the work over the physical CPUs.
+ */
+ private fun doSchedule(capacity: Double): SimResourceCommand {
+ // If there is no work yet, mark all inputs as idle.
+ if (outputContexts.isEmpty()) {
+ return SimResourceCommand.Idle()
+ }
+
+ val maxUsage = capacity
+ var duration: Double = Double.MAX_VALUE
+ var deadline: Long = Long.MAX_VALUE
+ var availableSpeed = maxUsage
+ var totalRequestedSpeed = 0.0
+ var totalRequestedWork = 0.0
+
+ // Flush the work of the outputs
+ var outputIterator = outputContexts.listIterator()
+ while (outputIterator.hasNext()) {
+ val output = outputIterator.next()
+
+ output.flush(isIntermediate = true)
+
+ if (output.activeCommand == SimResourceCommand.Exit) {
+ // Apparently the output consumer has exited, so remove it from the scheduling queue.
+ outputIterator.remove()
+ }
+ }
+
+ // Sort the outputs based on their requested usage
+ // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
+ outputContexts.sort()
+
+ // Divide the available input capacity fairly across the outputs using max-min fair sharing
+ outputIterator = outputContexts.listIterator()
+ var remaining = outputContexts.size
+ while (outputIterator.hasNext()) {
+ val output = outputIterator.next()
+ val availableShare = availableSpeed / remaining--
+
+ when (val command = output.activeCommand) {
+ is SimResourceCommand.Idle -> {
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, command.deadline)
+
+ output.actualSpeed = 0.0
+ }
+ is SimResourceCommand.Consume -> {
+ val grantedSpeed = min(output.allowedSpeed, availableShare)
+
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, command.deadline)
+
+ // Ignore idle computation
+ if (grantedSpeed <= 0.0 || command.work <= 0.0) {
+ output.actualSpeed = 0.0
+ continue
+ }
+
+ totalRequestedSpeed += command.limit
+ totalRequestedWork += command.work
+
+ output.actualSpeed = grantedSpeed
+ availableSpeed -= grantedSpeed
+
+ // The duration that we want to run is that of the shortest request from an output
+ duration = min(duration, command.work / grantedSpeed)
+ }
+ SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" }
+ }
+ }
+
+ assert(deadline >= clock.millis()) { "Deadline already passed" }
+
+ this.totalRequestedSpeed = totalRequestedSpeed
+ this.totalRequestedWork = totalRequestedWork
+ this.totalAllocatedSpeed = maxUsage - availableSpeed
+ this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration)
+
+ return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0)
+ SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+ }
+
+ /**
+ * Obtain the next command to perform.
+ */
+ private fun doNext(capacity: Double): SimResourceCommand {
+ val totalRequestedWork = totalRequestedWork.toLong()
+ val totalRemainingWork = totalRemainingWork.toLong()
+ val totalAllocatedWork = totalAllocatedWork.toLong()
+ val totalRequestedSpeed = totalRequestedSpeed
+ val totalAllocatedSpeed = totalAllocatedSpeed
+
+ // Force all inputs to re-schedule their work.
+ val command = doSchedule(capacity)
+
+ // Report metrics
+ listener?.onSliceFinish(
+ this,
+ totalRequestedWork,
+ totalAllocatedWork - totalRemainingWork,
+ totalOvercommittedWork.toLong(),
+ totalInterferedWork.toLong(),
+ totalAllocatedSpeed,
+ totalRequestedSpeed
+ )
+
+ totalInterferedWork = 0.0
+ totalOvercommittedWork = 0.0
+
+ return command
+ }
+
+ /**
+ * Event listener for hypervisor events.
+ */
+ public interface Listener {
+ /**
+ * This method is invoked when a slice is finished.
+ */
+ public fun onSliceFinish(
+ switch: SimResourceDistributor,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ )
+ }
+
+ /**
+ * An internal [SimResourceProvider] implementation for switch outputs.
+ */
+ private inner class OutputProvider(val capacity: Double) : SimResourceProvider {
+ /**
+ * The [OutputContext] that is currently running.
+ */
+ private var ctx: OutputContext? = null
+
+ override var state: SimResourceState = SimResourceState.Pending
+ internal set
+
+ override fun startConsumer(consumer: SimResourceConsumer) {
+ check(state == SimResourceState.Pending) { "Resource cannot be consumed" }
+
+ val ctx = OutputContext(this, consumer)
+ this.ctx = ctx
+ this.state = SimResourceState.Active
+ outputContexts += ctx
+
+ ctx.start()
+ schedule()
+ }
+
+ override fun close() {
+ cancel()
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Stopped
+ _outputs.remove(this)
+ }
+ }
+
+ override fun interrupt() {
+ ctx?.interrupt()
+ }
+
+ override fun cancel() {
+ val ctx = ctx
+ if (ctx != null) {
+ this.ctx = null
+ ctx.stop()
+ }
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
+ }
+
+ /**
+ * A [SimAbstractResourceContext] for the output resources.
+ */
+ private inner class OutputContext(
+ private val provider: OutputProvider,
+ consumer: SimResourceConsumer
+ ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> {
+ /**
+ * The current command that is processed by the vCPU.
+ */
+ var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
+
+ /**
+ * The processing speed that is allowed by the model constraints.
+ */
+ var allowedSpeed: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ var actualSpeed: Double = 0.0
+
+ private fun reportOvercommit() {
+ val remainingWork = remainingWork
+ totalOvercommittedWork += remainingWork
+ }
+
+ override fun onIdle(deadline: Long) {
+ reportOvercommit()
+
+ allowedSpeed = 0.0
+ activeCommand = SimResourceCommand.Idle(deadline)
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ reportOvercommit()
+
+ allowedSpeed = speed
+ activeCommand = SimResourceCommand.Consume(work, limit, deadline)
+ }
+
+ override fun onFinish(cause: Throwable?) {
+ reportOvercommit()
+
+ activeCommand = SimResourceCommand.Exit
+ provider.cancel()
+
+ super.onFinish(cause)
+ }
+
+ override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
+ // Apply performance interference model
+ val performanceScore = 1.0
+
+ // Compute the remaining amount of work
+ return if (work > 0.0) {
+ // Compute the fraction of compute time allocated to the VM
+ val fraction = actualSpeed / totalAllocatedSpeed
+
+ // Compute the work that was actually granted to the VM.
+ val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
+ val processed = processingAvailable * performanceScore
+
+ val interferedWork = processingAvailable - processed
+
+ totalInterferedWork += interferedWork
+
+ max(0.0, work - processed)
+ } else {
+ 0.0
+ }
+ }
+
+ override fun interrupt() {
+ // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
+ // to infinite recursion.
+ if (isProcessing) {
+ return
+ }
+
+ super.interrupt()
+
+ // Force the scheduler to re-schedule
+ schedule()
+ }
+
+ override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed)
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlow.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlow.kt
new file mode 100644
index 00000000..bbf6ad44
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlow.kt
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceFlow] acts as both a resource consumer and resource provider at the same time, simplifying bridging
+ * between different components.
+ */
+public interface SimResourceFlow : SimResourceConsumer, SimResourceProvider
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
new file mode 100644
index 00000000..52b13c5c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.suspendCancellableCoroutine
+
+/**
+ * A [SimResourceProvider] provides some resource of type [R].
+ */
+public interface SimResourceProvider : AutoCloseable {
+ /**
+ * The state of the resource.
+ */
+ public val state: SimResourceState
+
+ /**
+ * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously.
+ *
+ * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended.
+ */
+ public fun startConsumer(consumer: SimResourceConsumer)
+
+ /**
+ * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op.
+ */
+ public fun interrupt()
+
+ /**
+ * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op.
+ */
+ public fun cancel()
+
+ /**
+ * End the lifetime of the resource.
+ *
+ * This operation terminates the existing resource consumer.
+ */
+ public override fun close()
+}
+
+/**
+ * Consume the resource provided by this provider using the specified [consumer] and suspend execution until
+ * the consumer has finished.
+ */
+public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) {
+ return suspendCancellableCoroutine { cont ->
+ startConsumer(object : SimResourceConsumer by consumer {
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ assert(!cont.isCompleted) { "Coroutine already completed" }
+
+ consumer.onFinish(ctx, cause)
+
+ cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit))
+ }
+
+ override fun toString(): String = "SimSuspendingResourceConsumer"
+ })
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
new file mode 100644
index 00000000..025b0406
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.opendc.utils.TimerScheduler
+import java.time.Clock
+import kotlin.math.ceil
+import kotlin.math.min
+
+/**
+ * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity.
+ *
+ * @param initialCapacity The initial capacity of the resource.
+ * @param clock The virtual clock to track simulation time.
+ * @param scheduler The scheduler to schedule the interrupts.
+ */
+public class SimResourceSource(
+ initialCapacity: Double,
+ private val clock: Clock,
+ private val scheduler: TimerScheduler<Any>
+) : SimResourceProvider {
+ /**
+ * The current processing speed of the resource.
+ */
+ public val speed: Double
+ get() = ctx?.speed ?: 0.0
+
+ /**
+ * The capacity of the resource.
+ */
+ public var capacity: Double = initialCapacity
+ set(value) {
+ field = value
+ ctx?.capacity = value
+ }
+
+ /**
+ * The [Context] that is currently running.
+ */
+ private var ctx: Context? = null
+
+ override var state: SimResourceState = SimResourceState.Pending
+ private set
+
+ override fun startConsumer(consumer: SimResourceConsumer) {
+ check(state == SimResourceState.Pending) { "Resource is in invalid state" }
+ val ctx = Context(consumer)
+
+ this.ctx = ctx
+ this.state = SimResourceState.Active
+
+ ctx.start()
+ }
+
+ override fun close() {
+ cancel()
+ state = SimResourceState.Stopped
+ }
+
+ override fun interrupt() {
+ ctx?.interrupt()
+ }
+
+ override fun cancel() {
+ val ctx = ctx
+ if (ctx != null) {
+ this.ctx = null
+ ctx.stop()
+ }
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
+
+ /**
+ * Internal implementation of [SimResourceContext] for this class.
+ */
+ private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) {
+ override fun onIdle(deadline: Long) {
+ // Do not resume if deadline is "infinite"
+ if (deadline != Long.MAX_VALUE) {
+ scheduler.startSingleTimerTo(this, deadline) { flush() }
+ }
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ val until = min(deadline, clock.millis() + getDuration(work, speed))
+
+ scheduler.startSingleTimerTo(this, until, ::flush)
+ }
+
+ override fun onFinish(cause: Throwable?) {
+ scheduler.cancel(this)
+ cancel()
+
+ super.onFinish(cause)
+ }
+
+ override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]"
+ }
+
+ /**
+ * Compute the duration that a resource consumption will take with the specified [speed].
+ */
+ private fun getDuration(work: Double, speed: Double): Long {
+ return ceil(work / speed * 1000).toLong()
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt
new file mode 100644
index 00000000..c72951d0
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * The state of a resource provider.
+ */
+public enum class SimResourceState {
+ /**
+ * The resource provider is pending and the resource is waiting to be consumed.
+ */
+ Pending,
+
+ /**
+ * The resource provider is active and the resource is currently being consumed.
+ */
+ Active,
+
+ /**
+ * The resource provider is stopped and the resource cannot be consumed anymore.
+ */
+ Stopped
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
new file mode 100644
index 00000000..53fec16a
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers.
+ */
+public interface SimResourceSwitch : AutoCloseable {
+ /**
+ * The output resource providers to which resource consumers can be attached.
+ */
+ public val outputs: Set<SimResourceProvider>
+
+ /**
+ * The input resources that will be switched between the output providers.
+ */
+ public val inputs: Set<SimResourceProvider>
+
+ /**
+ * Add an output to the switch with the specified [capacity].
+ */
+ public fun addOutput(capacity: Double): SimResourceProvider
+
+ /**
+ * Add the specified [input] to the switch.
+ */
+ public fun addInput(input: SimResourceProvider)
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
new file mode 100644
index 00000000..45e4c220
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.util.ArrayDeque
+
+/**
+ * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that
+ * a single output is directly connected to an input and that the switch can only support as much outputs as inputs.
+ */
+public class SimResourceSwitchExclusive : SimResourceSwitch {
+ /**
+ * A flag to indicate that the switch is closed.
+ */
+ private var isClosed: Boolean = false
+
+ private val _outputs = mutableSetOf<Provider>()
+ override val outputs: Set<SimResourceProvider>
+ get() = _outputs
+
+ private val availableResources = ArrayDeque<SimResourceTransformer>()
+
+ private val _inputs = mutableSetOf<SimResourceProvider>()
+ override val inputs: Set<SimResourceProvider>
+ get() = _inputs
+
+ override fun addOutput(capacity: Double): SimResourceProvider {
+ check(!isClosed) { "Switch has been closed" }
+ check(availableResources.isNotEmpty()) { "No capacity to serve request" }
+ val forwarder = availableResources.poll()
+ val output = Provider(capacity, forwarder)
+ _outputs += output
+ return output
+ }
+
+ override fun addInput(input: SimResourceProvider) {
+ check(!isClosed) { "Switch has been closed" }
+
+ if (input in inputs) {
+ return
+ }
+
+ val forwarder = SimResourceForwarder()
+
+ _inputs += input
+ availableResources += forwarder
+
+ input.startConsumer(object : SimResourceConsumer by forwarder {
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ // De-register the input after it has finished
+ _inputs -= input
+ forwarder.onFinish(ctx, cause)
+ }
+ })
+ }
+
+ override fun close() {
+ isClosed = true
+
+ // Cancel all upstream subscriptions
+ _inputs.forEach(SimResourceProvider::cancel)
+ }
+
+ private inner class Provider(
+ private val capacity: Double,
+ private val forwarder: SimResourceTransformer
+ ) : SimResourceProvider by forwarder {
+ override fun close() {
+ // We explicitly do not close the forwarder here in order to re-use it across output resources.
+
+ _outputs -= this
+ availableResources += forwarder
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
new file mode 100644
index 00000000..c796c251
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.*
+import java.time.Clock
+
+/**
+ * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
+ * fair sharing.
+ */
+public class SimResourceSwitchMaxMin(
+ clock: Clock,
+ private val listener: Listener? = null
+) : SimResourceSwitch {
+ private val _outputs = mutableSetOf<SimResourceProvider>()
+ override val outputs: Set<SimResourceProvider>
+ get() = _outputs
+
+ private val _inputs = mutableSetOf<SimResourceProvider>()
+ override val inputs: Set<SimResourceProvider>
+ get() = _inputs
+
+ /**
+ * A flag to indicate that the switch was closed.
+ */
+ private var isClosed = false
+
+ /**
+ * The aggregator to aggregate the resources.
+ */
+ private val aggregator = SimResourceAggregatorMaxMin(clock)
+
+ /**
+ * The distributor to distribute the aggregated resources.
+ */
+ private val distributor = SimResourceDistributorMaxMin(
+ aggregator.output, clock,
+ object : SimResourceDistributorMaxMin.Listener {
+ override fun onSliceFinish(
+ switch: SimResourceDistributor,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand)
+ }
+ }
+ )
+
+ /**
+ * Add an output to the switch represented by [resource].
+ */
+ override fun addOutput(capacity: Double): SimResourceProvider {
+ check(!isClosed) { "Switch has been closed" }
+
+ val provider = distributor.addOutput(capacity)
+ _outputs.add(provider)
+ return provider
+ }
+
+ /**
+ * Add the specified [input] to the switch.
+ */
+ override fun addInput(input: SimResourceProvider) {
+ check(!isClosed) { "Switch has been closed" }
+
+ aggregator.addInput(input)
+ }
+
+ override fun close() {
+ if (!isClosed) {
+ isClosed = true
+ distributor.close()
+ aggregator.close()
+ }
+ }
+
+ /**
+ * Event listener for hypervisor events.
+ */
+ public interface Listener {
+ /**
+ * This method is invoked when a slice is finished.
+ */
+ public fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
new file mode 100644
index 00000000..de455021
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -0,0 +1,175 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider.
+ *
+ * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits.
+ * @param transform The function to transform the received resource command.
+ */
+public class SimResourceTransformer(
+ private val isCoupled: Boolean = false,
+ private val transform: (SimResourceContext, SimResourceCommand) -> SimResourceCommand
+) : SimResourceFlow {
+ /**
+ * The [SimResourceContext] in which the forwarder runs.
+ */
+ private var ctx: SimResourceContext? = null
+
+ /**
+ * The delegate [SimResourceConsumer].
+ */
+ private var delegate: SimResourceConsumer? = null
+
+ /**
+ * A flag to indicate that the delegate was started.
+ */
+ private var hasDelegateStarted: Boolean = false
+
+ /**
+ * The state of the forwarder.
+ */
+ override var state: SimResourceState = SimResourceState.Pending
+ private set
+
+ override fun startConsumer(consumer: SimResourceConsumer) {
+ check(state == SimResourceState.Pending) { "Resource is in invalid state" }
+
+ state = SimResourceState.Active
+ delegate = consumer
+
+ // Interrupt the provider to replace the consumer
+ interrupt()
+ }
+
+ override fun interrupt() {
+ ctx?.interrupt()
+ }
+
+ override fun cancel() {
+ val delegate = delegate
+ val ctx = ctx
+
+ state = SimResourceState.Pending
+
+ if (delegate != null && ctx != null) {
+ this.delegate = null
+ delegate.onFinish(ctx)
+ }
+ }
+
+ override fun close() {
+ val ctx = ctx
+
+ state = SimResourceState.Stopped
+
+ if (ctx != null) {
+ this.ctx = null
+ ctx.interrupt()
+ }
+ }
+
+ override fun onStart(ctx: SimResourceContext) {
+ this.ctx = ctx
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ val delegate = delegate
+
+ if (!hasDelegateStarted) {
+ start()
+ }
+
+ return if (state == SimResourceState.Stopped) {
+ SimResourceCommand.Exit
+ } else if (delegate != null) {
+ val command = transform(ctx, delegate.onNext(ctx))
+ if (command == SimResourceCommand.Exit) {
+ // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
+ // reset beforehand the existing state and check whether it has been updated afterwards
+ reset()
+
+ delegate.onFinish(ctx)
+
+ if (isCoupled || state == SimResourceState.Stopped)
+ SimResourceCommand.Exit
+ else
+ onNext(ctx)
+ } else {
+ command
+ }
+ } else {
+ SimResourceCommand.Idle()
+ }
+ }
+
+ override fun onConfirm(ctx: SimResourceContext, speed: Double) {
+ delegate?.onConfirm(ctx, speed)
+ }
+
+ override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
+ delegate?.onCapacityChanged(ctx, isThrottled)
+ }
+
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ this.ctx = null
+
+ val delegate = delegate
+ if (delegate != null) {
+ reset()
+ delegate.onFinish(ctx, cause)
+ }
+ }
+
+ /**
+ * Start the delegate.
+ */
+ private fun start() {
+ val delegate = delegate ?: return
+ delegate.onStart(checkNotNull(ctx))
+
+ hasDelegateStarted = true
+ }
+
+ /**
+ * Reset the delegate.
+ */
+ private fun reset() {
+ delegate = null
+ hasDelegateStarted = false
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
+}
+
+/**
+ * Constructs a [SimResourceTransformer] that forwards the received resource command with an identity transform.
+ *
+ * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits.
+ */
+public fun SimResourceForwarder(isCoupled: Boolean = false): SimResourceTransformer {
+ return SimResourceTransformer(isCoupled) { _, command -> command }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
new file mode 100644
index 00000000..52a42241
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+/**
+ * The [SimConsumerBarrier] is a barrier that allows consumers to wait for a select number of other consumers to
+ * complete, before proceeding its operation.
+ */
+public class SimConsumerBarrier(public val parties: Int) {
+ private var counter = 0
+
+ /**
+ * Enter the barrier and determine whether the caller is the last to reach the barrier.
+ *
+ * @return `true` if the caller is the last to reach the barrier, `false` otherwise.
+ */
+ public fun enter(): Boolean {
+ val last = ++counter == parties
+ if (last) {
+ counter = 0
+ return true
+ }
+ return false
+ }
+
+ /**
+ * Reset the barrier.
+ */
+ public fun reset() {
+ counter = 0
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
new file mode 100644
index 00000000..114c7312
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+import kotlin.math.min
+
+/**
+ * Helper class to expose an observable [speed] field describing the speed of the consumer.
+ */
+public class SimSpeedConsumerAdapter(
+ private val delegate: SimResourceConsumer,
+ private val callback: (Double) -> Unit = {}
+) : SimResourceConsumer by delegate {
+ /**
+ * The resource processing speed at this instant.
+ */
+ public var speed: Double = 0.0
+ private set(value) {
+ if (field != value) {
+ callback(value)
+ field = value
+ }
+ }
+
+ init {
+ callback(0.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return delegate.onNext(ctx)
+ }
+
+ override fun onConfirm(ctx: SimResourceContext, speed: Double) {
+ delegate.onConfirm(ctx, speed)
+
+ this.speed = speed
+ }
+
+ override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
+ val oldSpeed = speed
+
+ delegate.onCapacityChanged(ctx, isThrottled)
+
+ // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
+ // need to update the current speed.
+ if (oldSpeed == speed) {
+ speed = min(ctx.capacity, speed)
+ }
+ }
+
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ super.onFinish(ctx, cause)
+
+ speed = 0.0
+ }
+
+ override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]"
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
new file mode 100644
index 00000000..a52d1d5d
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+
+/**
+ * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource
+ * consumption for some period of time.
+ */
+public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer {
+ private var iterator: Iterator<Fragment>? = null
+
+ override fun onStart(ctx: SimResourceContext) {
+ check(iterator == null) { "Consumer already running" }
+ iterator = trace.iterator()
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ val iterator = checkNotNull(iterator)
+ return if (iterator.hasNext()) {
+ val now = ctx.clock.millis()
+ val fragment = iterator.next()
+ val work = (fragment.duration / 1000) * fragment.usage
+ val deadline = now + fragment.duration
+
+ assert(deadline >= now) { "Deadline already passed" }
+
+ if (work > 0.0)
+ SimResourceCommand.Consume(work, fragment.usage, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ iterator = null
+ }
+
+ /**
+ * A fragment of the workload.
+ */
+ public data class Fragment(val duration: Long, val usage: Double)
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
new file mode 100644
index 00000000..faa693c4
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+
+/**
+ * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization.
+ */
+public class SimWorkConsumer(
+ private val work: Double,
+ private val utilization: Double
+) : SimResourceConsumer {
+
+ init {
+ require(work >= 0.0) { "Work must be positive" }
+ require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
+ }
+
+ private var isFirst = true
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ val limit = ctx.capacity * utilization
+ val work = if (isFirst) {
+ isFirst = false
+ work
+ } else {
+ ctx.remainingWork
+ }
+ return if (work > 0.0) {
+ SimResourceCommand.Consume(work, limit)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
new file mode 100644
index 00000000..e272abb8
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -0,0 +1,202 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.verify
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * Test suite for the [SimResourceAggregatorMaxMin] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceAggregatorMaxMinTest {
+ @Test
+ fun testSingleCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val forwarder = SimResourceForwarder()
+ val sources = listOf(
+ forwarder,
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(1.0, 0.5)
+ val usage = mutableListOf<Double>()
+ val source = SimResourceSource(1.0, clock, scheduler)
+ val adapter = SimSpeedConsumerAdapter(forwarder, usage::add)
+ source.startConsumer(adapter)
+
+ try {
+ aggregator.output.consume(consumer)
+ yield()
+
+ assertAll(
+ { assertEquals(1000, clock.millis()) },
+ { assertEquals(listOf(0.0, 0.5, 0.0), usage) }
+ )
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testDoubleCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(2.0, 1.0)
+ val usage = mutableListOf<Double>()
+ val adapter = SimSpeedConsumerAdapter(consumer, usage::add)
+
+ try {
+ aggregator.output.consume(adapter)
+ yield()
+ assertAll(
+ { assertEquals(1000, clock.millis()) },
+ { assertEquals(listOf(0.0, 2.0, 0.0), usage) }
+ )
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testOvercommit() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(4.0, 4.0, 1000))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ aggregator.output.consume(consumer)
+ yield()
+ assertEquals(1000, clock.millis())
+
+ verify(exactly = 2) { consumer.onNext(any()) }
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testException() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> { aggregator.output.consume(consumer) }
+ yield()
+ assertEquals(SimResourceState.Pending, sources[0].state)
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(4.0, 1.0)
+ try {
+ coroutineScope {
+ launch { aggregator.output.consume(consumer) }
+ delay(1000)
+ sources[0].capacity = 0.5
+ }
+ yield()
+ assertEquals(2334, clock.millis())
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testFailOverCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(1.0, 0.5)
+ try {
+ coroutineScope {
+ launch { aggregator.output.consume(consumer) }
+ delay(500)
+ sources[0].capacity = 0.5
+ }
+ yield()
+ assertEquals(1000, clock.millis())
+ } finally {
+ aggregator.output.close()
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
new file mode 100644
index 00000000..02d456ff
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.junit.jupiter.api.assertThrows
+
+/**
+ * Test suite for [SimResourceCommand].
+ */
+class SimResourceCommandTest {
+ @Test
+ fun testZeroWork() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(0.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testNegativeWork() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(-1.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testZeroLimit() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(1.0, 0.0)
+ }
+ }
+
+ @Test
+ fun testNegativeLimit() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(1.0, -1.0, 1)
+ }
+ }
+
+ @Test
+ fun testConsumeCorrect() {
+ assertDoesNotThrow {
+ SimResourceCommand.Consume(1.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testIdleCorrect() {
+ assertDoesNotThrow {
+ SimResourceCommand.Idle(1)
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
new file mode 100644
index 00000000..be909556
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.*
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.*
+import org.opendc.simulator.core.runBlockingSimulation
+
+/**
+ * A test suite for the [SimAbstractResourceContext] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+class SimResourceContextTest {
+ @Test
+ fun testFlushWithoutCommand() = runBlockingSimulation {
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
+
+ val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ }
+
+ context.flush()
+ }
+
+ @Test
+ fun testIntermediateFlush() = runBlockingSimulation {
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
+
+ val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ })
+
+ context.start()
+ delay(1) // Delay 1 ms to prevent hitting the fast path
+ context.flush(isIntermediate = true)
+
+ verify(exactly = 2) { context.onConsume(any(), any(), any()) }
+ }
+
+ @Test
+ fun testIntermediateFlushIdle() = runBlockingSimulation {
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
+
+ val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ })
+
+ context.start()
+ delay(5)
+ context.flush(isIntermediate = true)
+ delay(5)
+ context.flush(isIntermediate = true)
+
+ assertAll(
+ { verify(exactly = 2) { context.onIdle(any()) } },
+ { verify(exactly = 1) { context.onFinish(null) } }
+ )
+ }
+
+ @Test
+ fun testDoubleStart() = runBlockingSimulation {
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
+
+ val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ }
+
+ context.start()
+ assertThrows<IllegalStateException> { context.start() }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
new file mode 100644
index 00000000..39f74481
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -0,0 +1,351 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import io.mockk.verify
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimResourceSource] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+class SimResourceSourceTest {
+ @Test
+ fun testSpeed() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1000 * capacity, capacity))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ val res = mutableListOf<Double>()
+ val adapter = SimSpeedConsumerAdapter(consumer, res::add)
+
+ provider.consume(adapter)
+
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+
+ try {
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ delay(1000)
+ provider.capacity = 0.5
+ }
+ assertEquals(3000, clock.millis())
+ verify(exactly = 1) { consumer.onCapacityChanged(any(), true) }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testSpeedLimit() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1000 * capacity, 2 * capacity))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ val res = mutableListOf<Double>()
+ val adapter = SimSpeedConsumerAdapter(consumer, res::add)
+
+ provider.consume(adapter)
+
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ /**
+ * Test to see whether no infinite recursion occurs when interrupting during [SimResourceConsumer.onStart] or
+ * [SimResourceConsumer.onNext].
+ */
+ @Test
+ fun testIntermediateInterrupt() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = object : SimResourceConsumer {
+ override fun onStart(ctx: SimResourceContext) {
+ ctx.interrupt()
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ try {
+ provider.consume(consumer)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testInterrupt() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+ lateinit var resCtx: SimResourceContext
+
+ val consumer = object : SimResourceConsumer {
+ var isFirst = true
+ override fun onStart(ctx: SimResourceContext) {
+ resCtx = ctx
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ assertEquals(0.0, ctx.remainingWork)
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(4.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+ }
+
+ try {
+ launch {
+ yield()
+ resCtx.interrupt()
+ }
+ provider.consume(consumer)
+
+ assertEquals(0, clock.millis())
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testFailure() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onStart(any()) }
+ .throws(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testExceptionPropagationOnNext() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testConcurrentConsumption() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> {
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ provider.consume(consumer)
+ }
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testClosedConsumption() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> {
+ provider.close()
+ provider.consume(consumer)
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testCloseDuringConsumption() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ launch { provider.consume(consumer) }
+ delay(500)
+ provider.close()
+
+ assertEquals(500, clock.millis())
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testIdle() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Idle(clock.millis() + 500))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ provider.consume(consumer)
+
+ assertEquals(500, clock.millis())
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testInfiniteSleep() {
+ assertThrows<IllegalStateException> {
+ runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Idle())
+ .andThenThrows(IllegalStateException())
+
+ try {
+ provider.consume(consumer)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+ }
+ }
+
+ @Test
+ fun testIncorrectDeadline() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Idle(2))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ delay(10)
+
+ assertThrows<IllegalArgumentException> { provider.consume(consumer) }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
new file mode 100644
index 00000000..f7d17867
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -0,0 +1,173 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * Test suite for the [SimResourceSwitchExclusive] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceSwitchExclusiveTest {
+ /**
+ * Test a trace workload.
+ */
+ @Test
+ fun testTrace() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val speed = mutableListOf<Double>()
+
+ val duration = 5 * 60L
+ val workload =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
+ val forwarder = SimResourceForwarder()
+ val adapter = SimSpeedConsumerAdapter(forwarder, speed::add)
+ source.startConsumer(adapter)
+ switch.addInput(forwarder)
+
+ val provider = switch.addOutput(3200.0)
+
+ try {
+ provider.consume(workload)
+ yield()
+ } finally {
+ provider.close()
+ }
+
+ assertAll(
+ { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } },
+ { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } }
+ )
+ }
+
+ /**
+ * Test runtime workload on hypervisor.
+ */
+ @Test
+ fun testRuntimeWorkload() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
+
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
+
+ switch.addInput(source)
+
+ val provider = switch.addOutput(3200.0)
+
+ try {
+ provider.consume(workload)
+ yield()
+ } finally {
+ provider.close()
+ }
+ assertEquals(duration, clock.millis()) { "Took enough time" }
+ }
+
+ /**
+ * Test two workloads running sequentially.
+ */
+ @Test
+ fun testTwoWorkloads() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = object : SimResourceConsumer {
+ var isFirst = true
+
+ override fun onStart(ctx: SimResourceContext) {
+ isFirst = true
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(duration / 1000.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+ }
+
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
+
+ switch.addInput(source)
+
+ val provider = switch.addOutput(3200.0)
+
+ try {
+ provider.consume(workload)
+ yield()
+ provider.consume(workload)
+ } finally {
+ provider.close()
+ }
+ assertEquals(duration * 2, clock.millis()) { "Took enough time" }
+ }
+
+ /**
+ * Test concurrent workloads on the machine.
+ */
+ @Test
+ fun testConcurrentWorkloadFails() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
+
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
+
+ switch.addInput(source)
+
+ switch.addOutput(3200.0)
+ assertThrows<IllegalStateException> { switch.addOutput(3200.0) }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
new file mode 100644
index 00000000..7416f277
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -0,0 +1,193 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * Test suite for the [SimResourceSwitch] implementations
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceSwitchMaxMinTest {
+ @Test
+ fun testSmoke() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val switch = SimResourceSwitchMaxMin(clock)
+
+ val sources = List(2) { SimResourceSource(2000.0, clock, scheduler) }
+ sources.forEach { switch.addInput(it) }
+
+ val provider = switch.addOutput(1000.0)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit
+
+ try {
+ provider.consume(consumer)
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with a single VM.
+ */
+ @Test
+ fun testOvercommittedSingle() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val listener = object : SimResourceSwitchMaxMin.Listener {
+ var totalRequestedWork = 0L
+ var totalGrantedWork = 0L
+ var totalOvercommittedWork = 0L
+
+ override fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ totalRequestedWork += requestedWork
+ totalGrantedWork += grantedWork
+ totalOvercommittedWork += overcommittedWork
+ }
+ }
+
+ val duration = 5 * 60L
+ val workload =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = SimResourceSwitchMaxMin(clock, listener)
+ val provider = switch.addOutput(3200.0)
+
+ try {
+ switch.addInput(SimResourceSource(3200.0, clock, scheduler))
+ provider.consume(workload)
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+
+ assertAll(
+ { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1200000, clock.millis()) }
+ )
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with two VMs.
+ */
+ @Test
+ fun testOvercommittedDual() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val listener = object : SimResourceSwitchMaxMin.Listener {
+ var totalRequestedWork = 0L
+ var totalGrantedWork = 0L
+ var totalOvercommittedWork = 0L
+
+ override fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ totalRequestedWork += requestedWork
+ totalGrantedWork += grantedWork
+ totalOvercommittedWork += overcommittedWork
+ }
+ }
+
+ val duration = 5 * 60L
+ val workloadA =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+ val workloadB =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3100.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 73.0)
+ )
+ )
+
+ val switch = SimResourceSwitchMaxMin(clock, listener)
+ val providerA = switch.addOutput(3200.0)
+ val providerB = switch.addOutput(3200.0)
+
+ try {
+ switch.addInput(SimResourceSource(3200.0, clock, scheduler))
+
+ coroutineScope {
+ launch { providerA.consume(workloadA) }
+ providerB.consume(workloadB)
+ }
+
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+ assertAll(
+ { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1200000, clock.millis()) }
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
new file mode 100644
index 00000000..d2ad73bc
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -0,0 +1,210 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import io.mockk.verify
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimResourceTransformer] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceTransformerTest {
+ @Test
+ fun testExitImmediately() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ launch {
+ source.consume(forwarder)
+ source.close()
+ }
+
+ forwarder.consume(object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ })
+
+ forwarder.close()
+ scheduler.close()
+ }
+
+ @Test
+ fun testExit() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ launch {
+ source.consume(forwarder)
+ source.close()
+ }
+
+ forwarder.consume(object : SimResourceConsumer {
+ var isFirst = true
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(10.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+ })
+
+ forwarder.close()
+ }
+
+ @Test
+ fun testState() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val consumer = object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit
+ }
+
+ assertEquals(SimResourceState.Pending, forwarder.state)
+
+ forwarder.startConsumer(consumer)
+ assertEquals(SimResourceState.Active, forwarder.state)
+
+ assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) }
+
+ forwarder.cancel()
+ assertEquals(SimResourceState.Pending, forwarder.state)
+
+ forwarder.close()
+ assertEquals(SimResourceState.Stopped, forwarder.state)
+ }
+
+ @Test
+ fun testCancelPendingDelegate() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Exit
+
+ forwarder.startConsumer(consumer)
+ forwarder.cancel()
+
+ verify(exactly = 0) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testCancelStartedDelegate() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ forwarder.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any()) }
+ verify(exactly = 1) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testCancelPropagation() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ source.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any()) }
+ verify(exactly = 1) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testExitPropagation() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder(isCoupled = true)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Exit
+
+ source.startConsumer(forwarder)
+ forwarder.consume(consumer)
+ yield()
+
+ assertEquals(SimResourceState.Pending, source.state)
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+ source.startConsumer(forwarder)
+
+ coroutineScope {
+ launch { forwarder.consume(consumer) }
+ delay(1000)
+ source.capacity = 0.5
+ }
+
+ assertEquals(3000, clock.millis())
+ verify(exactly = 1) { consumer.onCapacityChanged(any(), true) }
+ }
+
+ @Test
+ fun testTransformExit() = runBlockingSimulation {
+ val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit }
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+ source.startConsumer(forwarder)
+ forwarder.consume(consumer)
+
+ assertEquals(0, clock.millis())
+ verify(exactly = 1) { consumer.onNext(any()) }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
new file mode 100644
index 00000000..bf58b1b6
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimWorkConsumer] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimWorkConsumerTest {
+ @Test
+ fun testSmoke() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = SimWorkConsumer(1.0, 1.0)
+
+ try {
+ provider.consume(consumer)
+ assertEquals(1000, clock.millis())
+ } finally {
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testUtilization() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = SimWorkConsumer(1.0, 0.5)
+
+ try {
+ provider.consume(consumer)
+ assertEquals(2000, clock.millis())
+ } finally {
+ provider.close()
+ }
+ }
+}