summaryrefslogtreecommitdiff
path: root/simulator/opendc-simulator/opendc-simulator-resources/src
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-resources/src')
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt258
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt33
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt52
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt45
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt46
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt155
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt50
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt147
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt48
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt92
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt508
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt45
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt63
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt74
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt156
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt92
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt354
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt190
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt207
19 files changed, 2615 insertions, 0 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
new file mode 100644
index 00000000..52251bff
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -0,0 +1,258 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
+ */
+public abstract class SimAbstractResourceContext<R : SimResource>(
+ override val resource: R,
+ override val clock: Clock,
+ private val consumer: SimResourceConsumer<R>
+) : SimResourceContext<R> {
+ /**
+ * This method is invoked when the resource will idle until the specified [deadline].
+ */
+ public abstract fun onIdle(deadline: Long)
+
+ /**
+ * This method is invoked when the resource will be consumed until the specified [work] was processed or the
+ * [deadline] was reached.
+ */
+ public abstract fun onConsume(work: Double, limit: Double, deadline: Long)
+
+ /**
+ * This method is invoked when the resource consumer has finished.
+ */
+ public abstract fun onFinish()
+
+ /**
+ * This method is invoked when the resource consumer throws an exception.
+ */
+ public abstract fun onFailure(cause: Throwable)
+
+ /**
+ * Compute the duration that a resource consumption will take with the specified [speed].
+ */
+ protected open fun getDuration(work: Double, speed: Double): Long {
+ return ceil(work / speed * 1000).toLong()
+ }
+
+ /**
+ * Compute the speed at which the resource may be consumed.
+ */
+ protected open fun getSpeed(limit: Double): Double {
+ return min(limit, resource.capacity)
+ }
+
+ /**
+ * Get the remaining work to process after a resource consumption was flushed.
+ *
+ * @param work The size of the resource consumption.
+ * @param speed The speed of consumption.
+ * @param duration The duration from the start of the consumption until now.
+ * @param isInterrupted A flag to indicate that the resource consumption could not be fully processed due to
+ * it being interrupted before it could finish or reach its deadline.
+ * @return The amount of work remaining.
+ */
+ protected open fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
+ return if (duration > 0L) {
+ val processed = duration / 1000.0 * speed
+ max(0.0, work - processed)
+ } else {
+ 0.0
+ }
+ }
+
+ /**
+ * Start the consumer.
+ */
+ public fun start() {
+ try {
+ isProcessing = true
+ latestFlush = clock.millis()
+
+ interpret(consumer.onStart(this))
+ } catch (e: Throwable) {
+ onFailure(e)
+ } finally {
+ isProcessing = false
+ }
+ }
+
+ /**
+ * Immediately stop the consumer.
+ */
+ public fun stop() {
+ try {
+ isProcessing = true
+ latestFlush = clock.millis()
+
+ flush(isIntermediate = true)
+ onFinish()
+ } catch (e: Throwable) {
+ onFailure(e)
+ } finally {
+ isProcessing = false
+ }
+ }
+
+ /**
+ * Flush the current active resource consumption.
+ *
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public open fun flush(isIntermediate: Boolean = false) {
+ val now = clock.millis()
+
+ // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it.
+ if (isIntermediate && latestFlush >= now) {
+ return
+ }
+
+ try {
+ val activeCommand = activeCommand ?: return
+ val (timestamp, command) = activeCommand
+
+ isProcessing = true
+ this.activeCommand = null
+
+ val duration = now - timestamp
+ assert(duration >= 0) { "Flush in the past" }
+
+ when (command) {
+ is SimResourceCommand.Idle -> {
+ // We should only continue processing the next command if:
+ // 1. The resource consumer reached its deadline.
+ // 2. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ if (command.deadline <= now || !isIntermediate) {
+ next(remainingWork = 0.0)
+ } else {
+ this.activeCommand = activeCommand
+ }
+ }
+ is SimResourceCommand.Consume -> {
+ val speed = min(resource.capacity, command.limit)
+ val isInterrupted = !isIntermediate && duration < getDuration(command.work, speed)
+ val remainingWork = getRemainingWork(command.work, speed, duration, isInterrupted)
+
+ // We should only continue processing the next command if:
+ // 1. The resource consumption was finished.
+ // 2. The resource consumer reached its deadline.
+ // 3. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) {
+ next(remainingWork)
+ } else {
+ interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline))
+ }
+ }
+ SimResourceCommand.Exit ->
+ // Flush may not be called when the resource consumer has finished
+ throw IllegalStateException()
+ }
+ } catch (e: Throwable) {
+ onFailure(e)
+ } finally {
+ latestFlush = now
+ isProcessing = false
+ }
+ }
+
+ override fun interrupt() {
+ // Prevent users from interrupting the resource while they are constructing their next command, as this will
+ // only lead to infinite recursion.
+ if (isProcessing) {
+ return
+ }
+
+ flush()
+ }
+
+ override fun toString(): String = "SimAbstractResourceContext[resource=$resource]"
+
+ /**
+ * A flag to indicate that the resource is currently processing a command.
+ */
+ protected var isProcessing: Boolean = false
+
+ /**
+ * The current command that is being processed.
+ */
+ private var activeCommand: CommandWrapper? = null
+
+ /**
+ * The latest timestamp at which the resource was flushed.
+ */
+ private var latestFlush: Long = Long.MIN_VALUE
+
+ /**
+ * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
+ */
+ private fun interpret(command: SimResourceCommand) {
+ val now = clock.millis()
+
+ when (command) {
+ is SimResourceCommand.Idle -> {
+ val deadline = command.deadline
+
+ require(deadline >= now) { "Deadline already passed" }
+
+ onIdle(deadline)
+ }
+ is SimResourceCommand.Consume -> {
+ val work = command.work
+ val limit = command.limit
+ val deadline = command.deadline
+
+ require(deadline >= now) { "Deadline already passed" }
+
+ onConsume(work, limit, deadline)
+ }
+ is SimResourceCommand.Exit -> {
+ onFinish()
+ }
+ }
+
+ assert(activeCommand == null) { "Concurrent access to current command" }
+ activeCommand = CommandWrapper(now, command)
+ }
+
+ /**
+ * Request the workload for more work.
+ */
+ private fun next(remainingWork: Double) {
+ interpret(consumer.onNext(this, remainingWork))
+ }
+
+ /**
+ * This class wraps a [command] with the timestamp it was started and possibly the task associated with it.
+ */
+ private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand)
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt
new file mode 100644
index 00000000..31b0a175
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A generic representation of resource that may be consumed.
+ */
+public interface SimResource {
+ /**
+ * The capacity of the resource.
+ */
+ public val capacity: Double
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
new file mode 100644
index 00000000..77c0a7a9
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A SimResourceCommand communicates to a [SimResource] how it is consumed by a [SimResourceConsumer].
+ */
+public sealed class SimResourceCommand {
+ /**
+ * A request to the resource to perform the specified amount of work before the given [deadline].
+ *
+ * @param work The amount of work to process on the CPU.
+ * @param limit The maximum amount of work to be processed per second.
+ * @param deadline The instant at which the work needs to be fulfilled.
+ */
+ public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() {
+ init {
+ require(work > 0) { "Amount of work must be positive" }
+ require(limit > 0) { "Limit must be positive" }
+ }
+ }
+
+ /**
+ * An indication to the resource that the consumer will idle until the specified [deadline] or if it is interrupted.
+ */
+ public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand()
+
+ /**
+ * An indication to the resource that the consumer has finished.
+ */
+ public object Exit : SimResourceCommand()
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
new file mode 100644
index 00000000..f516faa6
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A SimResourceConsumer characterizes how a [SimResource] is consumed.
+ */
+public interface SimResourceConsumer<in R : SimResource> {
+ /**
+ * This method is invoked when the consumer is started for a resource.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @return The next command that the resource should perform.
+ */
+ public fun onStart(ctx: SimResourceContext<R>): SimResourceCommand
+
+ /**
+ * This method is invoked when a resource was either interrupted or reached its deadline.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @param remainingWork The remaining work that was not yet completed.
+ * @return The next command that the resource should perform.
+ */
+ public fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
new file mode 100644
index 00000000..dfb5e9ce
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a
+ * resource and a resource consumer.
+ */
+public interface SimResourceContext<out R : SimResource> {
+ /**
+ * The resource that is managed by this context.
+ */
+ public val resource: R
+
+ /**
+ * The virtual clock tracking simulation time.
+ */
+ public val clock: Clock
+
+ /**
+ * Ask the resource provider to interrupt its resource.
+ */
+ public fun interrupt()
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
new file mode 100644
index 00000000..ca23557c
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
@@ -0,0 +1,155 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+
+/**
+ * A helper class to construct a [SimResourceProvider] which forwards the requests to a [SimResourceConsumer].
+ */
+public class SimResourceForwarder<R : SimResource>(override val resource: R) :
+ SimResourceProvider<R>, SimResourceConsumer<R> {
+ /**
+ * The [SimResourceContext] in which the forwarder runs.
+ */
+ private var ctx: SimResourceContext<R>? = null
+
+ /**
+ * A flag to indicate that the forwarder is closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
+ * The continuation to resume after consumption.
+ */
+ private var cont: Continuation<Unit>? = null
+
+ /**
+ * The delegate [SimResourceConsumer].
+ */
+ private var delegate: SimResourceConsumer<R>? = null
+
+ /**
+ * A flag to indicate that the delegate was started.
+ */
+ private var hasDelegateStarted: Boolean = false
+
+ /**
+ * The remaining amount of work last cycle.
+ */
+ private var remainingWork: Double = 0.0
+
+ override suspend fun consume(consumer: SimResourceConsumer<R>) {
+ check(!isClosed) { "Lifecycle of forwarder has ended" }
+ check(cont == null) { "Run should not be called concurrently" }
+
+ return suspendCancellableCoroutine { cont ->
+ this.cont = cont
+ this.delegate = consumer
+
+ cont.invokeOnCancellation { reset() }
+
+ ctx?.interrupt()
+ }
+ }
+
+ override fun interrupt() {
+ ctx?.interrupt()
+ }
+
+ override fun close() {
+ isClosed = true
+ interrupt()
+ ctx = null
+ }
+
+ override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand {
+ this.ctx = ctx
+
+ return onNext(ctx, 0.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand {
+ this.remainingWork = remainingWork
+
+ return if (isClosed) {
+ SimResourceCommand.Exit
+ } else if (!hasDelegateStarted) {
+ start()
+ } else {
+ next()
+ }
+ }
+
+ /**
+ * Start the delegate.
+ */
+ private fun start(): SimResourceCommand {
+ val delegate = delegate ?: return SimResourceCommand.Idle()
+ val command = delegate.onStart(checkNotNull(ctx))
+
+ hasDelegateStarted = true
+
+ return forward(command)
+ }
+
+ /**
+ * Obtain the next command to process.
+ */
+ private fun next(): SimResourceCommand {
+ val delegate = delegate
+ return forward(delegate?.onNext(checkNotNull(ctx), remainingWork) ?: SimResourceCommand.Idle())
+ }
+
+ /**
+ * Forward the specified [command].
+ */
+ private fun forward(command: SimResourceCommand): SimResourceCommand {
+ return if (command == SimResourceCommand.Exit) {
+ val cont = checkNotNull(cont)
+
+ // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
+ // reset beforehand the existing state and check whether it has been updated afterwards
+ reset()
+ cont.resume(Unit)
+
+ if (isClosed)
+ SimResourceCommand.Exit
+ else
+ start()
+ } else {
+ command
+ }
+ }
+
+ /**
+ * Reset the delegate.
+ */
+ private fun reset() {
+ cont = null
+ delegate = null
+ hasDelegateStarted = false
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
new file mode 100644
index 00000000..e35aa683
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceProvider] provides some resource of type [R].
+ */
+public interface SimResourceProvider<out R : SimResource> : AutoCloseable {
+ /**
+ * The resource that is managed by this provider.
+ */
+ public val resource: R
+
+ /**
+ * Consume the resource provided by this provider using the specified [consumer].
+ */
+ public suspend fun consume(consumer: SimResourceConsumer<R>)
+
+ /**
+ * Interrupt the resource.
+ */
+ public fun interrupt()
+
+ /**
+ * End the lifetime of the resource.
+ *
+ * This operation terminates the existing resource consumer.
+ */
+ public override fun close()
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
new file mode 100644
index 00000000..540a17c9
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.StateFlow
+import org.opendc.utils.TimerScheduler
+import java.time.Clock
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
+import kotlin.math.min
+
+/**
+ * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity.
+ *
+ * @param resource The resource to provide.
+ * @param clock The virtual clock to track simulation time.
+ */
+public class SimResourceSource<R : SimResource>(
+ override val resource: R,
+ private val clock: Clock,
+ private val scheduler: TimerScheduler<Any>
+) : SimResourceProvider<R> {
+ /**
+ * The resource processing speed over time.
+ */
+ public val speed: StateFlow<Double>
+ get() = _speed
+ private val _speed = MutableStateFlow(0.0)
+
+ /**
+ * A flag to indicate that the resource was closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
+ * The current active consumer.
+ */
+ private var cont: CancellableContinuation<Unit>? = null
+
+ /**
+ * The [Context] that is currently running.
+ */
+ private var ctx: Context? = null
+
+ override suspend fun consume(consumer: SimResourceConsumer<R>) {
+ check(!isClosed) { "Lifetime of resource has ended." }
+ check(cont == null) { "Run should not be called concurrently" }
+
+ try {
+ return suspendCancellableCoroutine { cont ->
+ val ctx = Context(consumer, cont)
+
+ this.cont = cont
+ this.ctx = ctx
+
+ ctx.start()
+ cont.invokeOnCancellation {
+ ctx.stop()
+ }
+ }
+ } finally {
+ cont = null
+ ctx = null
+ }
+ }
+
+ override fun close() {
+ isClosed = true
+ cont?.cancel()
+ cont = null
+ ctx = null
+ }
+
+ override fun interrupt() {
+ ctx?.interrupt()
+ }
+
+ /**
+ * Internal implementation of [SimResourceContext] for this class.
+ */
+ private inner class Context(
+ consumer: SimResourceConsumer<R>,
+ val cont: Continuation<Unit>
+ ) : SimAbstractResourceContext<R>(resource, clock, consumer) {
+ /**
+ * The processing speed of the resource.
+ */
+ private var speed: Double = 0.0
+ set(value) {
+ field = value
+ _speed.value = field
+ }
+
+ override fun onIdle(deadline: Long) {
+ speed = 0.0
+
+ // Do not resume if deadline is "infinite"
+ if (deadline != Long.MAX_VALUE) {
+ scheduler.startSingleTimerTo(this, deadline) { flush() }
+ }
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ speed = getSpeed(limit)
+ val until = min(deadline, clock.millis() + getDuration(work, speed))
+
+ scheduler.startSingleTimerTo(this, until, ::flush)
+ }
+
+ override fun onFinish() {
+ speed = 0.0
+ scheduler.cancel(this)
+ cont.resume(Unit)
+ }
+
+ override fun onFailure(cause: Throwable) {
+ speed = 0.0
+ scheduler.cancel(this)
+ cont.resumeWithException(cause)
+ }
+
+ override fun toString(): String = "SimResourceSource.Context[resource=$resource]"
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
new file mode 100644
index 00000000..cd1af3fc
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceSwitch] enables switching of capacity of multiple resources of type [R] between multiple consumers.
+ */
+public interface SimResourceSwitch<R : SimResource> : AutoCloseable {
+ /**
+ * The output resource providers to which resource consumers can be attached.
+ */
+ public val outputs: Set<SimResourceProvider<R>>
+
+ /**
+ * The input resources that will be switched between the output providers.
+ */
+ public val inputs: Set<SimResourceProvider<R>>
+
+ /**
+ * Add an output to the switch represented by [resource].
+ */
+ public fun addOutput(resource: R): SimResourceProvider<R>
+
+ /**
+ * Add the specified [input] to the switch.
+ */
+ public fun addInput(input: SimResourceProvider<R>)
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
new file mode 100644
index 00000000..060d0ea2
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.launch
+import java.util.ArrayDeque
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that
+ * a single output is directly connected to an input and that the switch can only support as much outputs as inputs.
+ */
+public class SimResourceSwitchExclusive<R : SimResource>(context: CoroutineContext) : SimResourceSwitch<R> {
+ /**
+ * The [CoroutineScope] of the service bounded by the lifecycle of the service.
+ */
+ private val scope = CoroutineScope(context + Job())
+
+ private val _outputs = mutableSetOf<SimResourceProvider<R>>()
+ override val outputs: Set<SimResourceProvider<R>>
+ get() = _outputs
+
+ private val availableResources = ArrayDeque<SimResourceForwarder<R>>()
+ private val _inputs = mutableSetOf<SimResourceProvider<R>>()
+ override val inputs: Set<SimResourceProvider<R>>
+ get() = _inputs
+
+ override fun addOutput(resource: R): SimResourceProvider<R> {
+ check(availableResources.isNotEmpty()) { "No capacity to serve request" }
+ val forwarder = availableResources.poll()
+ val output = Provider(resource, forwarder)
+ _outputs += output
+ return output
+ }
+
+ override fun addInput(input: SimResourceProvider<R>) {
+ if (input in inputs) {
+ return
+ }
+
+ val forwarder = SimResourceForwarder(input.resource)
+
+ scope.launch { input.consume(forwarder) }
+
+ _inputs += input
+ availableResources += forwarder
+ }
+
+ override fun close() {
+ scope.cancel()
+ }
+
+ private inner class Provider(
+ override val resource: R,
+ private val forwarder: SimResourceForwarder<R>
+ ) : SimResourceProvider<R> {
+
+ override suspend fun consume(consumer: SimResourceConsumer<R>) = forwarder.consume(consumer)
+
+ override fun interrupt() {
+ forwarder.interrupt()
+ }
+
+ override fun close() {
+ _outputs -= this
+ availableResources += forwarder
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
new file mode 100644
index 00000000..bcf76d3c
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -0,0 +1,508 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.*
+import org.opendc.simulator.resources.consumer.SimConsumerBarrier
+import java.time.Clock
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
+ * fair sharing.
+ */
+public class SimResourceSwitchMaxMin<R : SimResource>(
+ private val clock: Clock,
+ context: CoroutineContext,
+ private val listener: Listener<R>? = null
+) : SimResourceSwitch<R> {
+ /**
+ * The [CoroutineScope] of the service bounded by the lifecycle of the service.
+ */
+ private val scope = CoroutineScope(context + Job())
+
+ private val inputConsumers = mutableSetOf<InputConsumer>()
+ private val _outputs = mutableSetOf<OutputProvider>()
+ override val outputs: Set<SimResourceProvider<R>>
+ get() = _outputs
+
+ private val _inputs = mutableSetOf<SimResourceProvider<R>>()
+ override val inputs: Set<SimResourceProvider<R>>
+ get() = _inputs
+
+ /**
+ * The commands to submit to the underlying host.
+ */
+ private val commands = mutableMapOf<R, SimResourceCommand>()
+
+ /**
+ * The active output contexts.
+ */
+ private val outputContexts: MutableList<OutputContext> = mutableListOf()
+
+ /**
+ * The total amount of remaining work (of all pCPUs).
+ */
+ private var totalRemainingWork: Double = 0.0
+
+ /**
+ * The total speed requested by the vCPUs.
+ */
+ private var totalRequestedSpeed = 0.0
+
+ /**
+ * The total amount of work requested by the vCPUs.
+ */
+ private var totalRequestedWork = 0.0
+
+ /**
+ * The total allocated speed for the vCPUs.
+ */
+ private var totalAllocatedSpeed = 0.0
+
+ /**
+ * The total allocated work requested for the vCPUs.
+ */
+ private var totalAllocatedWork = 0.0
+
+ /**
+ * The amount of work that could not be performed due to over-committing resources.
+ */
+ private var totalOvercommittedWork = 0.0
+
+ /**
+ * The amount of work that was lost due to interference.
+ */
+ private var totalInterferedWork = 0.0
+
+ /**
+ * A flag to indicate that the scheduler has submitted work that has not yet been completed.
+ */
+ private var isDirty: Boolean = false
+
+ /**
+ * The scheduler barrier.
+ */
+ private var barrier: SimConsumerBarrier = SimConsumerBarrier(0)
+
+ /**
+ * Add an output to the switch represented by [resource].
+ */
+ override fun addOutput(resource: R): SimResourceProvider<R> {
+ val provider = OutputProvider(resource)
+ _outputs.add(provider)
+ return provider
+ }
+
+ /**
+ * Add the specified [input] to the switch.
+ */
+ override fun addInput(input: SimResourceProvider<R>) {
+ val consumer = InputConsumer(input)
+ _inputs.add(input)
+ inputConsumers += consumer
+ }
+
+ override fun close() {
+ scope.cancel()
+ }
+
+ /**
+ * Indicate that the workloads should be re-scheduled.
+ */
+ private fun schedule() {
+ isDirty = true
+ interruptAll()
+ }
+
+ /**
+ * Schedule the work over the physical CPUs.
+ */
+ private fun doSchedule() {
+ // If there is no work yet, mark all inputs as idle.
+ if (outputContexts.isEmpty()) {
+ commands.replaceAll { _, _ -> SimResourceCommand.Idle() }
+ interruptAll()
+ }
+
+ val maxUsage = inputs.sumByDouble { it.resource.capacity }
+ var duration: Double = Double.MAX_VALUE
+ var deadline: Long = Long.MAX_VALUE
+ var availableSpeed = maxUsage
+ var totalRequestedSpeed = 0.0
+ var totalRequestedWork = 0.0
+
+ // Sort the outputs based on their requested usage
+ // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
+ outputContexts.sort()
+
+ // Divide the available input capacity fairly across the outputs using max-min fair sharing
+ val outputIterator = outputContexts.listIterator()
+ var remaining = outputContexts.size
+ while (outputIterator.hasNext()) {
+ val output = outputIterator.next()
+ val availableShare = availableSpeed / remaining--
+
+ when (val command = output.activeCommand) {
+ is SimResourceCommand.Idle -> {
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, command.deadline)
+
+ output.actualSpeed = 0.0
+ }
+ is SimResourceCommand.Consume -> {
+ val grantedSpeed = min(output.allowedSpeed, availableShare)
+
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, command.deadline)
+
+ // Ignore idle computation
+ if (grantedSpeed <= 0.0 || command.work <= 0.0) {
+ output.actualSpeed = 0.0
+ continue
+ }
+
+ totalRequestedSpeed += command.limit
+ totalRequestedWork += command.work
+
+ output.actualSpeed = grantedSpeed
+ availableSpeed -= grantedSpeed
+
+ // The duration that we want to run is that of the shortest request from an output
+ duration = min(duration, command.work / grantedSpeed)
+ }
+ SimResourceCommand.Exit -> {
+ // Apparently the output consumer has exited, so remove it from the scheduling queue.
+ outputIterator.remove()
+ }
+ }
+ }
+
+ // Round the duration to milliseconds
+ duration = ceil(duration * 1000) / 1000
+
+ assert(deadline >= clock.millis()) { "Deadline already passed" }
+
+ val totalAllocatedSpeed = maxUsage - availableSpeed
+ var totalAllocatedWork = 0.0
+ availableSpeed = totalAllocatedSpeed
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (input in inputs.sortedByDescending { it.resource.capacity }) {
+ val maxResourceUsage = input.resource.capacity
+ val fraction = maxResourceUsage / maxUsage
+ val grantedSpeed = min(maxResourceUsage, totalAllocatedSpeed * fraction)
+ val grantedWork = duration * grantedSpeed
+
+ commands[input.resource] =
+ if (grantedWork > 0.0 && grantedSpeed > 0.0)
+ SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+
+ totalAllocatedWork += grantedWork
+ availableSpeed -= grantedSpeed
+ }
+
+ this.totalRequestedSpeed = totalRequestedSpeed
+ this.totalRequestedWork = totalRequestedWork
+ this.totalAllocatedSpeed = totalAllocatedSpeed
+ this.totalAllocatedWork = totalAllocatedWork
+
+ interruptAll()
+ }
+
+ /**
+ * Flush the progress of the vCPUs.
+ */
+ private fun flushGuests() {
+ // Flush all the outputs work
+ for (output in outputContexts) {
+ output.flush(isIntermediate = true)
+ }
+
+ // Report metrics
+ listener?.onSliceFinish(
+ this,
+ totalRequestedWork.toLong(),
+ (totalAllocatedWork - totalRemainingWork).toLong(),
+ totalOvercommittedWork.toLong(),
+ totalInterferedWork.toLong(),
+ totalRequestedSpeed,
+ totalAllocatedSpeed
+ )
+ totalRemainingWork = 0.0
+ totalInterferedWork = 0.0
+ totalOvercommittedWork = 0.0
+
+ // Force all inputs to re-schedule their work.
+ doSchedule()
+ }
+
+ /**
+ * Interrupt all inputs.
+ */
+ private fun interruptAll() {
+ for (input in inputConsumers) {
+ input.interrupt()
+ }
+ }
+
+ /**
+ * Event listener for hypervisor events.
+ */
+ public interface Listener<R : SimResource> {
+ /**
+ * This method is invoked when a slice is finished.
+ */
+ public fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin<R>,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ )
+ }
+
+ /**
+ * An internal [SimResourceProvider] implementation for switch outputs.
+ */
+ private inner class OutputProvider(override val resource: R) : SimResourceProvider<R> {
+ /**
+ * A flag to indicate that the resource was closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
+ * The current active consumer.
+ */
+ private var cont: CancellableContinuation<Unit>? = null
+
+ /**
+ * The [OutputContext] that is currently running.
+ */
+ private var ctx: OutputContext? = null
+
+ override suspend fun consume(consumer: SimResourceConsumer<R>) {
+ check(!isClosed) { "Lifetime of resource has ended." }
+ check(cont == null) { "Run should not be called concurrently" }
+
+ try {
+ return suspendCancellableCoroutine { cont ->
+ val ctx = OutputContext(resource, consumer, cont)
+ ctx.start()
+ cont.invokeOnCancellation {
+ ctx.stop()
+ }
+
+ this.cont = cont
+ this.ctx = ctx
+
+ outputContexts += ctx
+ schedule()
+ }
+ } finally {
+ cont = null
+ ctx = null
+ }
+ }
+
+ override fun close() {
+ isClosed = true
+ cont?.cancel()
+ cont = null
+ ctx = null
+ _outputs.remove(this)
+ }
+
+ override fun interrupt() {
+ ctx?.interrupt()
+ }
+ }
+
+ /**
+ * A [SimAbstractResourceContext] for the output resources.
+ */
+ private inner class OutputContext(
+ resource: R,
+ consumer: SimResourceConsumer<R>,
+ private val cont: Continuation<Unit>
+ ) : SimAbstractResourceContext<R>(resource, clock, consumer), Comparable<OutputContext> {
+ /**
+ * The current command that is processed by the vCPU.
+ */
+ var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
+
+ /**
+ * The processing speed that is allowed by the model constraints.
+ */
+ var allowedSpeed: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ var actualSpeed: Double = 0.0
+
+ /**
+ * A flag to indicate that the CPU has exited.
+ */
+ var hasExited: Boolean = false
+
+ override fun onIdle(deadline: Long) {
+ allowedSpeed = 0.0
+ activeCommand = SimResourceCommand.Idle(deadline)
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ allowedSpeed = getSpeed(limit)
+ activeCommand = SimResourceCommand.Consume(work, limit, deadline)
+ }
+
+ override fun onFinish() {
+ hasExited = true
+ activeCommand = SimResourceCommand.Exit
+ cont.resume(Unit)
+ }
+
+ override fun onFailure(cause: Throwable) {
+ hasExited = true
+ activeCommand = SimResourceCommand.Exit
+ cont.resumeWithException(cause)
+ }
+
+ override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
+ // Apply performance interference model
+ val performanceScore = 1.0
+
+ // Compute the remaining amount of work
+ val remainingWork = if (work > 0.0) {
+ // Compute the fraction of compute time allocated to the VM
+ val fraction = actualSpeed / totalAllocatedSpeed
+
+ // Compute the work that was actually granted to the VM.
+ val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
+ val processed = processingAvailable * performanceScore
+
+ val interferedWork = processingAvailable - processed
+
+ totalInterferedWork += interferedWork
+
+ max(0.0, work - processed)
+ } else {
+ 0.0
+ }
+
+ if (!isInterrupted) {
+ totalOvercommittedWork += remainingWork
+ }
+
+ return remainingWork
+ }
+
+ override fun interrupt() {
+ // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
+ // to infinite recursion.
+ if (isProcessing) {
+ return
+ }
+
+ super.interrupt()
+
+ // Force the scheduler to re-schedule
+ schedule()
+ }
+
+ override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed)
+ }
+
+ /**
+ * An internal [SimResourceConsumer] implementation for switch inputs.
+ */
+ private inner class InputConsumer(val input: SimResourceProvider<R>) : SimResourceConsumer<R> {
+ /**
+ * The resource context of the consumer.
+ */
+ private lateinit var ctx: SimResourceContext<R>
+
+ init {
+ scope.launch {
+ try {
+ barrier = SimConsumerBarrier(barrier.parties + 1)
+ input.consume(this@InputConsumer)
+ } catch (e: CancellationException) {
+ // Cancel gracefully
+ throw e
+ } catch (e: Throwable) {
+ e.printStackTrace()
+ } finally {
+ barrier = SimConsumerBarrier(barrier.parties - 1)
+ inputConsumers -= this@InputConsumer
+ _inputs -= input
+ }
+ }
+ }
+
+ /**
+ * Interrupt the consumer
+ */
+ fun interrupt() {
+ ctx.interrupt()
+ }
+
+ override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand {
+ this.ctx = ctx
+ return commands[ctx.resource] ?: SimResourceCommand.Idle()
+ }
+
+ override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand {
+ totalRemainingWork += remainingWork
+ val isLast = barrier.enter()
+
+ // Flush the progress of the guest after the barrier has been reached.
+ if (isLast && isDirty) {
+ isDirty = false
+ flushGuests()
+ }
+
+ return if (isDirty) {
+ // Wait for the scheduler determine the work after the barrier has been reached by all CPUs.
+ SimResourceCommand.Idle()
+ } else {
+ // Indicate that the scheduler needs to run next call.
+ if (isLast) {
+ isDirty = true
+ }
+
+ commands[ctx.resource] ?: SimResourceCommand.Idle()
+ }
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
new file mode 100644
index 00000000..7aa5a5aa
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+/**
+ * The [SimConsumerBarrier] is a barrier that allows consumers to wait for a select number of other consumers to
+ * complete, before proceeding its operation.
+ */
+public class SimConsumerBarrier(public val parties: Int) {
+ private var counter = 0
+
+ /**
+ * Enter the barrier and determine whether the caller is the last to reach the barrier.
+ *
+ * @return `true` if the caller is the last to reach the barrier, `false` otherwise.
+ */
+ public fun enter(): Boolean {
+ val last = ++counter == parties
+ if (last) {
+ counter = 0
+ return true
+ }
+ return false
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
new file mode 100644
index 00000000..03a3cebd
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+import org.opendc.simulator.resources.SimResource
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+
+/**
+ * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource
+ * consumption for some period of time.
+ */
+public class SimTraceConsumer(trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> {
+ private val iterator = trace.iterator()
+
+ override fun onStart(ctx: SimResourceContext<SimResource>): SimResourceCommand {
+ return onNext(ctx, 0.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimResource>, remainingWork: Double): SimResourceCommand {
+ return if (iterator.hasNext()) {
+ val now = ctx.clock.millis()
+ val fragment = iterator.next()
+ val work = (fragment.duration / 1000) * fragment.usage
+ val deadline = now + fragment.duration
+
+ assert(deadline >= now) { "Deadline already passed" }
+
+ if (work > 0.0)
+ SimResourceCommand.Consume(work, fragment.usage, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+
+ /**
+ * A fragment of the workload.
+ */
+ public data class Fragment(val duration: Long, val usage: Double)
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
new file mode 100644
index 00000000..02d456ff
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.junit.jupiter.api.assertThrows
+
+/**
+ * Test suite for [SimResourceCommand].
+ */
+class SimResourceCommandTest {
+ @Test
+ fun testZeroWork() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(0.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testNegativeWork() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(-1.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testZeroLimit() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(1.0, 0.0)
+ }
+ }
+
+ @Test
+ fun testNegativeLimit() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(1.0, -1.0, 1)
+ }
+ }
+
+ @Test
+ fun testConsumeCorrect() {
+ assertDoesNotThrow {
+ SimResourceCommand.Consume(1.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testIdleCorrect() {
+ assertDoesNotThrow {
+ SimResourceCommand.Idle(1)
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
new file mode 100644
index 00000000..e7642dc1
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -0,0 +1,156 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+
+/**
+ * A test suite for the [SimAbstractResourceContext] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+class SimResourceContextTest {
+ data class SimCpu(val speed: Double) : SimResource {
+ override val capacity: Double
+ get() = speed
+ }
+
+ @Test
+ fun testFlushWithoutCommand() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+
+ val resource = SimCpu(4200.0)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(10.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
+ override fun onIdle(deadline: Long) {
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ }
+
+ override fun onFinish() {
+ }
+
+ override fun onFailure(cause: Throwable) {
+ }
+ }
+
+ context.flush()
+ }
+
+ @Test
+ fun testIntermediateFlush() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val resource = SimCpu(4200.0)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(10.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ var counter = 0
+ val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
+ override fun onIdle(deadline: Long) {
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ counter++
+ }
+
+ override fun onFinish() {
+ }
+
+ override fun onFailure(cause: Throwable) {
+ }
+ }
+
+ context.start()
+ delay(1) // Delay 1 ms to prevent hitting the fast path
+ context.flush(isIntermediate = true)
+ assertEquals(2, counter)
+ }
+
+ @Test
+ fun testIntermediateFlushIdle() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val resource = SimCpu(4200.0)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Idle(10)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ var counter = 0
+ var isFinished = false
+ val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
+ override fun onIdle(deadline: Long) {
+ counter++
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ }
+
+ override fun onFinish() {
+ isFinished = true
+ }
+
+ override fun onFailure(cause: Throwable) {
+ }
+ }
+
+ context.start()
+ delay(5)
+ context.flush(isIntermediate = true)
+ delay(5)
+ context.flush(isIntermediate = true)
+
+ assertAll(
+ { assertEquals(1, counter) },
+ { assertTrue(isFinished) }
+ )
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
new file mode 100644
index 00000000..ced1bd98
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimResourceForwarder] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceForwarderTest {
+
+ data class SimCpu(val speed: Double) : SimResource {
+ override val capacity: Double
+ get() = speed
+ }
+
+ @Test
+ fun testExitImmediately() = runBlockingTest {
+ val forwarder = SimResourceForwarder(SimCpu(1000.0))
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(SimCpu(2000.0), clock, scheduler)
+
+ launch {
+ source.consume(forwarder)
+ source.close()
+ }
+
+ forwarder.consume(object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ })
+ forwarder.close()
+ scheduler.close()
+ }
+
+ @Test
+ fun testExit() = runBlockingTest {
+ val forwarder = SimResourceForwarder(SimCpu(1000.0))
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(SimCpu(2000.0), clock, scheduler)
+
+ launch {
+ source.consume(forwarder)
+ source.close()
+ }
+
+ forwarder.consume(object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ })
+
+ forwarder.close()
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
new file mode 100644
index 00000000..4f7825fc
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -0,0 +1,354 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.toList
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimResourceSource] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+class SimResourceSourceTest {
+ data class SimCpu(val speed: Double) : SimResource {
+ override val capacity: Double
+ get() = speed
+ }
+
+ @Test
+ fun testSpeed() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1000 * ctx.resource.speed, ctx.resource.speed)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ try {
+ val res = mutableListOf<Double>()
+ val job = launch { provider.speed.toList(res) }
+
+ provider.consume(consumer)
+
+ job.cancel()
+ assertEquals(listOf(0.0, provider.resource.speed, 0.0), res) { "Speed is reported correctly" }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testSpeedLimit() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1000 * ctx.resource.speed, 2 * ctx.resource.speed)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ try {
+ val res = mutableListOf<Double>()
+ val job = launch { provider.speed.toList(res) }
+
+ provider.consume(consumer)
+
+ job.cancel()
+ assertEquals(listOf(0.0, provider.resource.speed, 0.0), res) { "Speed is reported correctly" }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ /**
+ * Test to see whether no infinite recursion occurs when interrupting during [SimResourceConsumer.onStart] or
+ * [SimResourceConsumer.onNext].
+ */
+ @Test
+ fun testIntermediateInterrupt() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ ctx.interrupt()
+ return SimResourceCommand.Exit
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ try {
+ provider.consume(consumer)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testInterrupt() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+ lateinit var resCtx: SimResourceContext<SimCpu>
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ resCtx = ctx
+ return SimResourceCommand.Consume(4.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ assertEquals(0.0, remainingWork)
+ return SimResourceCommand.Exit
+ }
+ }
+
+ try {
+ launch {
+ yield()
+ resCtx.interrupt()
+ }
+ provider.consume(consumer)
+
+ assertEquals(0, currentTime)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testFailure() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ throw IllegalStateException()
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ try {
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testExceptionPropagationOnNext() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ try {
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testConcurrentConsumption() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ try {
+ assertThrows<IllegalStateException> {
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ provider.consume(consumer)
+ }
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testClosedConsumption() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ try {
+ assertThrows<IllegalStateException> {
+ provider.close()
+ provider.consume(consumer)
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testCloseDuringConsumption() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ try {
+ launch { provider.consume(consumer) }
+ delay(500)
+ provider.close()
+
+ assertEquals(500, currentTime)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testIdle() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Idle(ctx.clock.millis() + 500)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ try {
+ provider.consume(consumer)
+
+ assertEquals(500, currentTime)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testInfiniteSleep() {
+ assertThrows<IllegalStateException> {
+ runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Idle()
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ try {
+ provider.consume(consumer)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
new file mode 100644
index 00000000..ca6558bf
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -0,0 +1,190 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.flow.toList
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.runBlockingTest
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.utils.TimerScheduler
+import java.lang.IllegalStateException
+
+/**
+ * Test suite for the [SimResourceSwitchExclusive] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceSwitchExclusiveTest {
+ class SimCpu(val speed: Double) : SimResource {
+ override val capacity: Double
+ get() = speed
+ }
+
+ /**
+ * Test a trace workload.
+ */
+ @Test
+ fun testTrace() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val speed = mutableListOf<Double>()
+
+ val duration = 5 * 60L
+ val workload =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
+
+ switch.addInput(source)
+
+ val provider = switch.addOutput(SimCpu(3200.0))
+ val job = launch { source.speed.toList(speed) }
+
+ try {
+ provider.consume(workload)
+ yield()
+ } finally {
+ job.cancel()
+ provider.close()
+ }
+
+ assertAll(
+ { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } },
+ { assertEquals(5 * 60L * 4000, currentTime) { "Took enough time" } }
+ )
+ }
+
+ /**
+ * Test runtime workload on hypervisor.
+ */
+ @Test
+ fun testRuntimeWorkload() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(duration / 1000.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
+
+ switch.addInput(source)
+
+ val provider = switch.addOutput(SimCpu(3200.0))
+
+ try {
+ provider.consume(workload)
+ yield()
+ } finally {
+ provider.close()
+ }
+ assertEquals(duration, currentTime) { "Took enough time" }
+ }
+
+ /**
+ * Test two workloads running sequentially.
+ */
+ @Test
+ fun testTwoWorkloads() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(duration / 1000.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
+
+ switch.addInput(source)
+
+ val provider = switch.addOutput(SimCpu(3200.0))
+
+ try {
+ provider.consume(workload)
+ yield()
+ provider.consume(workload)
+ } finally {
+ provider.close()
+ }
+ assertEquals(duration * 2, currentTime) { "Took enough time" }
+ }
+
+ /**
+ * Test concurrent workloads on the machine.
+ */
+ @Test
+ fun testConcurrentWorkloadFails() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(duration.toDouble(), 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
+
+ switch.addInput(source)
+
+ switch.addOutput(SimCpu(3200.0))
+ assertThrows<IllegalStateException> { switch.addOutput(SimCpu(3200.0)) }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
new file mode 100644
index 00000000..698c1700
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -0,0 +1,207 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.runBlockingTest
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.utils.TimerScheduler
+
+/**
+ * Test suite for the [SimResourceSwitch] implementations
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceSwitchMaxMinTest {
+ class SimCpu(val speed: Double) : SimResource {
+ override val capacity: Double
+ get() = speed
+ }
+
+ @Test
+ fun testSmoke() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val switch = SimResourceSwitchMaxMin<SimCpu>(clock, coroutineContext)
+
+ val sources = List(2) { SimResourceSource(SimCpu(2000.0), clock, scheduler) }
+ sources.forEach { switch.addInput(it) }
+
+ val provider = switch.addOutput(SimCpu(1000.0))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ try {
+ provider.consume(consumer)
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with a single VM.
+ */
+ @Test
+ fun testOvercommittedSingle() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val listener = object : SimResourceSwitchMaxMin.Listener<SimCpu> {
+ var totalRequestedWork = 0L
+ var totalGrantedWork = 0L
+ var totalOvercommittedWork = 0L
+
+ override fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin<SimCpu>,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ totalRequestedWork += requestedWork
+ totalGrantedWork += grantedWork
+ totalOvercommittedWork += overcommittedWork
+ }
+ }
+
+ val duration = 5 * 60L
+ val workload =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener)
+ val provider = switch.addOutput(SimCpu(3200.0))
+
+ try {
+ switch.addInput(SimResourceSource(SimCpu(3200.0), clock, scheduler))
+ provider.consume(workload)
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+
+ assertAll(
+ { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1200000, currentTime) }
+ )
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with two VMs.
+ */
+ @Test
+ fun testOvercommittedDual() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val listener = object : SimResourceSwitchMaxMin.Listener<SimCpu> {
+ var totalRequestedWork = 0L
+ var totalGrantedWork = 0L
+ var totalOvercommittedWork = 0L
+
+ override fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin<SimCpu>,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ totalRequestedWork += requestedWork
+ totalGrantedWork += grantedWork
+ totalOvercommittedWork += overcommittedWork
+ }
+ }
+
+ val duration = 5 * 60L
+ val workloadA =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+ val workloadB =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3100.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 73.0)
+ )
+ )
+
+ val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener)
+ val providerA = switch.addOutput(SimCpu(3200.0))
+ val providerB = switch.addOutput(SimCpu(3200.0))
+
+ try {
+ switch.addInput(SimResourceSource(SimCpu(3200.0), clock, scheduler))
+
+ coroutineScope {
+ launch { providerA.consume(workloadA) }
+ providerB.consume(workloadB)
+ }
+
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+ assertAll(
+ { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1200000, currentTime) }
+ )
+ }
+}