summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-resources
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-04-25 21:53:42 +0200
committerGitHub <noreply@github.com>2021-04-25 21:53:42 +0200
commit128f76f7fd7c8abb41a3bbbd9f1980cbc20ae7a5 (patch)
treeadd513890005233a7784466797bfe6f5052e9eeb /opendc-simulator/opendc-simulator-resources
parent128a1db017545597a5c035b7960eb3fd36b5f987 (diff)
parent57b54b59ed74ec37338ae26b3864d051255aba49 (diff)
build: Flatten project structure
This change updates the project structure to become flattened. Previously, the simulator, frontend and API each lived into their own directory. With this change, all modules of the project live in the top-level directory of the repository.
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources')
-rw-r--r--opendc-simulator/opendc-simulator-resources/build.gradle.kts39
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt43
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt135
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt208
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt344
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt48
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt63
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt52
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt79
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt51
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt43
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt420
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlow.kt29
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt79
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt129
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt43
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt48
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt95
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt119
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt175
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt52
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt81
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt68
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt58
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt202
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt74
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt104
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt351
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt173
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt193
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt210
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt66
32 files changed, 3874 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/build.gradle.kts b/opendc-simulator/opendc-simulator-resources/build.gradle.kts
new file mode 100644
index 00000000..3b0a197c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/build.gradle.kts
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Uniform resource consumption simulation model"
+
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+ `benchmark-conventions`
+}
+
+dependencies {
+ api(platform(project(":opendc-platform")))
+ api("org.jetbrains.kotlinx:kotlinx-coroutines-core")
+ implementation(project(":opendc-utils"))
+
+ jmhImplementation(project(":opendc-simulator:opendc-simulator-core"))
+ testImplementation(project(":opendc-simulator:opendc-simulator-core"))
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt
new file mode 100644
index 00000000..8d2587b1
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
+
+/**
+ * Helper function to create simple consumer workload.
+ */
+fun createSimpleConsumer(): SimResourceConsumer {
+ return SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(1000, 28.0),
+ SimTraceConsumer.Fragment(1000, 3500.0),
+ SimTraceConsumer.Fragment(1000, 0.0),
+ SimTraceConsumer.Fragment(1000, 183.0),
+ SimTraceConsumer.Fragment(1000, 400.0),
+ SimTraceConsumer.Fragment(1000, 100.0),
+ SimTraceConsumer.Fragment(1000, 3000.0),
+ SimTraceConsumer.Fragment(1000, 4500.0),
+ ),
+ )
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
new file mode 100644
index 00000000..beda3eaa
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
@@ -0,0 +1,135 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.launch
+import org.opendc.simulator.core.SimulationCoroutineScope
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.utils.TimerScheduler
+import org.openjdk.jmh.annotations.*
+import java.util.concurrent.TimeUnit
+
+@State(Scope.Thread)
+@Fork(1)
+@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
+@OptIn(ExperimentalCoroutinesApi::class)
+class SimResourceBenchmarks {
+ private lateinit var scope: SimulationCoroutineScope
+ private lateinit var scheduler: TimerScheduler<Any>
+
+ @Setup
+ fun setUp() {
+ scope = SimulationCoroutineScope()
+ scheduler = TimerScheduler(scope.coroutineContext, scope.clock)
+ }
+
+ @State(Scope.Thread)
+ class Workload {
+ lateinit var consumers: Array<SimResourceConsumer>
+
+ @Setup
+ fun setUp() {
+ consumers = Array(3) { createSimpleConsumer() }
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSource(state: Workload) {
+ return scope.runBlockingSimulation {
+ val provider = SimResourceSource(4200.0, clock, scheduler)
+ return@runBlockingSimulation provider.consume(state.consumers[0])
+ }
+ }
+
+ @Benchmark
+ fun benchmarkForwardOverhead(state: Workload) {
+ return scope.runBlockingSimulation {
+ val provider = SimResourceSource(4200.0, clock, scheduler)
+ val forwarder = SimResourceForwarder()
+ provider.startConsumer(forwarder)
+ return@runBlockingSimulation forwarder.consume(state.consumers[0])
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) {
+ return scope.runBlockingSimulation {
+ val switch = SimResourceSwitchMaxMin(clock)
+
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+
+ val provider = switch.addOutput(3500.0)
+ return@runBlockingSimulation provider.consume(state.consumers[0])
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) {
+ return scope.runBlockingSimulation {
+ val switch = SimResourceSwitchMaxMin(clock)
+
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+
+ repeat(3) { i ->
+ launch {
+ val provider = switch.addOutput(3500.0)
+ provider.consume(state.consumers[i])
+ }
+ }
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSwitchExclusiveSingleConsumer(state: Workload) {
+ return scope.runBlockingSimulation {
+ val switch = SimResourceSwitchExclusive()
+
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+
+ val provider = switch.addOutput(3500.0)
+ return@runBlockingSimulation provider.consume(state.consumers[0])
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSwitchExclusiveTripleConsumer(state: Workload) {
+ return scope.runBlockingSimulation {
+ val switch = SimResourceSwitchExclusive()
+
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+
+ repeat(2) { i ->
+ launch {
+ val provider = switch.addOutput(3500.0)
+ provider.consume(state.consumers[i])
+ }
+ }
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
new file mode 100644
index 00000000..c7fa6a17
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * Abstract implementation of [SimResourceAggregator].
+ */
+public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator {
+ /**
+ * The available resource provider contexts.
+ */
+ protected val inputContexts: Set<SimResourceContext>
+ get() = _inputContexts
+ private val _inputContexts = mutableSetOf<SimResourceContext>()
+
+ /**
+ * The output context.
+ */
+ protected val outputContext: SimResourceContext
+ get() = context
+
+ /**
+ * The commands to submit to the underlying input resources.
+ */
+ protected val commands: MutableMap<SimResourceContext, SimResourceCommand> = mutableMapOf()
+
+ /**
+ * This method is invoked when the resource consumer consumes resources.
+ */
+ protected abstract fun doConsume(work: Double, limit: Double, deadline: Long)
+
+ /**
+ * This method is invoked when the resource consumer enters an idle state.
+ */
+ protected open fun doIdle(deadline: Long) {
+ for (input in inputContexts) {
+ commands[input] = SimResourceCommand.Idle(deadline)
+ }
+ }
+
+ /**
+ * This method is invoked when the resource consumer finishes processing.
+ */
+ protected open fun doFinish(cause: Throwable?) {
+ for (input in inputContexts) {
+ commands[input] = SimResourceCommand.Exit
+ }
+ }
+
+ /**
+ * This method is invoked when an input context is started.
+ */
+ protected open fun onContextStarted(ctx: SimResourceContext) {
+ _inputContexts.add(ctx)
+ }
+
+ protected open fun onContextFinished(ctx: SimResourceContext) {
+ assert(_inputContexts.remove(ctx)) { "Lost context" }
+ }
+
+ override fun addInput(input: SimResourceProvider) {
+ check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" }
+
+ val consumer = Consumer()
+ _inputs.add(input)
+ input.startConsumer(consumer)
+ }
+
+ override fun close() {
+ output.close()
+ }
+
+ override val output: SimResourceProvider
+ get() = _output
+ private val _output = SimResourceForwarder()
+
+ override val inputs: Set<SimResourceProvider>
+ get() = _inputs
+ private val _inputs = mutableSetOf<SimResourceProvider>()
+
+ private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) {
+ override val remainingWork: Double
+ get() {
+ val now = clock.millis()
+
+ return if (_remainingWorkFlush < now) {
+ _remainingWorkFlush = now
+ _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it }
+ } else {
+ _remainingWork
+ }
+ }
+ private var _remainingWork: Double = 0.0
+ private var _remainingWorkFlush: Long = Long.MIN_VALUE
+
+ override fun interrupt() {
+ super.interrupt()
+
+ interruptAll()
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline)
+
+ override fun onIdle(deadline: Long) = doIdle(deadline)
+
+ override fun onFinish(cause: Throwable?) {
+ doFinish(cause)
+
+ super.onFinish(cause)
+
+ interruptAll()
+ }
+ }
+
+ /**
+ * A flag to indicate that an interrupt is active.
+ */
+ private var isInterrupting: Boolean = false
+
+ /**
+ * Schedule the work over the input resources.
+ */
+ private fun doSchedule() {
+ context.flush(isIntermediate = true)
+ interruptAll()
+ }
+
+ /**
+ * Interrupt all inputs.
+ */
+ private fun interruptAll() {
+ // Prevent users from interrupting the resource while they are constructing their next command, as this will
+ // only lead to infinite recursion.
+ if (isInterrupting) {
+ return
+ }
+
+ try {
+ isInterrupting = true
+
+ val iterator = _inputs.iterator()
+ while (iterator.hasNext()) {
+ val input = iterator.next()
+ input.interrupt()
+
+ if (input.state != SimResourceState.Active) {
+ iterator.remove()
+ }
+ }
+ } finally {
+ isInterrupting = false
+ }
+ }
+
+ /**
+ * An internal [SimResourceConsumer] implementation for aggregator inputs.
+ */
+ private inner class Consumer : SimResourceConsumer {
+ override fun onStart(ctx: SimResourceContext) {
+ onContextStarted(ctx)
+ onCapacityChanged(ctx, false)
+
+ // Make sure we initialize the output if we have not done so yet
+ if (context.state == SimResourceState.Pending) {
+ context.start()
+ }
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ doSchedule()
+
+ return commands[ctx] ?: SimResourceCommand.Idle()
+ }
+
+ override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
+ // Adjust capacity of output resource
+ context.capacity = inputContexts.sumByDouble { it.capacity }
+ }
+
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ onContextFinished(ctx)
+
+ super.onFinish(ctx, cause)
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
new file mode 100644
index 00000000..05ed0714
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -0,0 +1,344 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
+ */
+public abstract class SimAbstractResourceContext(
+ initialCapacity: Double,
+ override val clock: Clock,
+ private val consumer: SimResourceConsumer
+) : SimResourceContext {
+ /**
+ * The capacity of the resource.
+ */
+ public final override var capacity: Double = initialCapacity
+ set(value) {
+ val oldValue = field
+
+ // Only changes will be propagated
+ if (value != oldValue) {
+ field = value
+ onCapacityChange()
+ }
+ }
+
+ /**
+ * The amount of work still remaining at this instant.
+ */
+ override val remainingWork: Double
+ get() {
+ val activeCommand = activeCommand ?: return 0.0
+ val now = clock.millis()
+
+ return if (_remainingWorkFlush < now) {
+ _remainingWorkFlush = now
+ computeRemainingWork(activeCommand, now).also { _remainingWork = it }
+ } else {
+ _remainingWork
+ }
+ }
+ private var _remainingWork: Double = 0.0
+ private var _remainingWorkFlush: Long = Long.MIN_VALUE
+
+ /**
+ * A flag to indicate the state of the context.
+ */
+ public var state: SimResourceState = SimResourceState.Pending
+ private set
+
+ /**
+ * The current processing speed of the resource.
+ */
+ public var speed: Double = 0.0
+ private set
+
+ /**
+ * This method is invoked when the resource will idle until the specified [deadline].
+ */
+ public abstract fun onIdle(deadline: Long)
+
+ /**
+ * This method is invoked when the resource will be consumed until the specified [work] was processed or the
+ * [deadline] was reached.
+ */
+ public abstract fun onConsume(work: Double, limit: Double, deadline: Long)
+
+ /**
+ * This method is invoked when the resource consumer has finished.
+ */
+ public open fun onFinish(cause: Throwable?) {
+ consumer.onFinish(this, cause)
+ }
+
+ /**
+ * Get the remaining work to process after a resource consumption.
+ *
+ * @param work The size of the resource consumption.
+ * @param speed The speed of consumption.
+ * @param duration The duration from the start of the consumption until now.
+ * @return The amount of work remaining.
+ */
+ protected open fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
+ return if (duration > 0L) {
+ val processed = duration / 1000.0 * speed
+ max(0.0, work - processed)
+ } else {
+ 0.0
+ }
+ }
+
+ /**
+ * Start the consumer.
+ */
+ public fun start() {
+ check(state == SimResourceState.Pending) { "Consumer is already started" }
+
+ val now = clock.millis()
+
+ state = SimResourceState.Active
+ isProcessing = true
+ latestFlush = now
+
+ try {
+ consumer.onStart(this)
+ activeCommand = interpret(consumer.onNext(this), now)
+ } catch (cause: Throwable) {
+ doStop(cause)
+ } finally {
+ isProcessing = false
+ }
+ }
+
+ /**
+ * Immediately stop the consumer.
+ */
+ public fun stop() {
+ try {
+ isProcessing = true
+ latestFlush = clock.millis()
+
+ flush(isIntermediate = true)
+ doStop(null)
+ } catch (cause: Throwable) {
+ doStop(cause)
+ } finally {
+ isProcessing = false
+ }
+ }
+
+ /**
+ * Flush the current active resource consumption.
+ *
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public fun flush(isIntermediate: Boolean = false) {
+ // Flush is no-op when the consumer is finished or not yet started
+ if (state != SimResourceState.Active) {
+ return
+ }
+
+ val now = clock.millis()
+
+ // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it.
+ if (isIntermediate && latestFlush >= now) {
+ return
+ }
+
+ try {
+ val activeCommand = activeCommand ?: return
+ val (timestamp, command) = activeCommand
+
+ // Note: accessor is reliant on activeCommand being set
+ val remainingWork = remainingWork
+
+ isProcessing = true
+
+ val duration = now - timestamp
+ assert(duration >= 0) { "Flush in the past" }
+
+ this.activeCommand = when (command) {
+ is SimResourceCommand.Idle -> {
+ // We should only continue processing the next command if:
+ // 1. The resource consumer reached its deadline.
+ // 2. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ if (command.deadline <= now || !isIntermediate) {
+ next(now)
+ } else {
+ interpret(SimResourceCommand.Idle(command.deadline), now)
+ }
+ }
+ is SimResourceCommand.Consume -> {
+ // We should only continue processing the next command if:
+ // 1. The resource consumption was finished.
+ // 2. The resource capacity cannot satisfy the demand.
+ // 4. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) {
+ next(now)
+ } else {
+ interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline), now)
+ }
+ }
+ SimResourceCommand.Exit ->
+ // Flush may not be called when the resource consumer has finished
+ throw IllegalStateException()
+ }
+
+ // Flush remaining work cache
+ _remainingWorkFlush = Long.MIN_VALUE
+ } catch (cause: Throwable) {
+ doStop(cause)
+ } finally {
+ latestFlush = now
+ isProcessing = false
+ }
+ }
+
+ override fun interrupt() {
+ // Prevent users from interrupting the resource while they are constructing their next command, as this will
+ // only lead to infinite recursion.
+ if (isProcessing) {
+ return
+ }
+
+ flush()
+ }
+
+ override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]"
+
+ /**
+ * A flag to indicate that the resource is currently processing a command.
+ */
+ protected var isProcessing: Boolean = false
+
+ /**
+ * The current command that is being processed.
+ */
+ private var activeCommand: CommandWrapper? = null
+
+ /**
+ * The latest timestamp at which the resource was flushed.
+ */
+ private var latestFlush: Long = Long.MIN_VALUE
+
+ /**
+ * Finish the consumer and resource provider.
+ */
+ private fun doStop(cause: Throwable?) {
+ val state = state
+ this.state = SimResourceState.Stopped
+
+ if (state == SimResourceState.Active) {
+ activeCommand = null
+ onFinish(cause)
+ }
+ }
+
+ /**
+ * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
+ */
+ private fun interpret(command: SimResourceCommand, now: Long): CommandWrapper? {
+ when (command) {
+ is SimResourceCommand.Idle -> {
+ val deadline = command.deadline
+
+ require(deadline >= now) { "Deadline already passed" }
+
+ speed = 0.0
+ consumer.onConfirm(this, 0.0)
+
+ onIdle(deadline)
+ }
+ is SimResourceCommand.Consume -> {
+ val work = command.work
+ val limit = command.limit
+ val deadline = command.deadline
+
+ require(deadline >= now) { "Deadline already passed" }
+
+ speed = min(capacity, limit)
+ consumer.onConfirm(this, speed)
+
+ onConsume(work, limit, deadline)
+ }
+ is SimResourceCommand.Exit -> {
+ speed = 0.0
+
+ doStop(null)
+
+ // No need to set the next active command
+ return null
+ }
+ }
+
+ return CommandWrapper(now, command)
+ }
+
+ /**
+ * Request the workload for more work.
+ */
+ private fun next(now: Long): CommandWrapper? = interpret(consumer.onNext(this), now)
+
+ /**
+ * Compute the remaining work based on the specified [wrapper] and [timestamp][now].
+ */
+ private fun computeRemainingWork(wrapper: CommandWrapper, now: Long): Double {
+ val (timestamp, command) = wrapper
+ val duration = now - timestamp
+ return when (command) {
+ is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration)
+ is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0
+ }
+ }
+
+ /**
+ * Indicate that the capacity of the resource has changed.
+ */
+ private fun onCapacityChange() {
+ // Do not inform the consumer if it has not been started yet
+ if (state != SimResourceState.Active) {
+ return
+ }
+
+ val isThrottled = speed > capacity
+ consumer.onCapacityChanged(this, isThrottled)
+
+ // Optimization: only flush changes if the new capacity cannot satisfy the active resource command.
+ // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush().
+ if (isThrottled) {
+ flush(isIntermediate = true)
+ }
+ }
+
+ /**
+ * This class wraps a [command] with the timestamp it was started and possibly the task associated with it.
+ */
+ private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand)
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
new file mode 100644
index 00000000..bb4e6a2c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource.
+ */
+public interface SimResourceAggregator : AutoCloseable {
+ /**
+ * The output resource provider to which resource consumers can be attached.
+ */
+ public val output: SimResourceProvider
+
+ /**
+ * The input resources that will be switched between the output providers.
+ */
+ public val inputs: Set<SimResourceProvider>
+
+ /**
+ * Add the specified [input] to the switch.
+ */
+ public fun addInput(input: SimResourceProvider)
+
+ /**
+ * End the lifecycle of the aggregator.
+ */
+ public override fun close()
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
new file mode 100644
index 00000000..08bc064e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * A [SimResourceAggregator] that distributes the load equally across the input resources.
+ */
+public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) {
+ private val consumers = mutableListOf<SimResourceContext>()
+
+ override fun doConsume(work: Double, limit: Double, deadline: Long) {
+ // Sort all consumers by their capacity
+ consumers.sortWith(compareBy { it.capacity })
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (input in consumers) {
+ val inputCapacity = input.capacity
+ val fraction = inputCapacity / outputContext.capacity
+ val grantedSpeed = limit * fraction
+ val grantedWork = fraction * work
+
+ commands[input] =
+ if (grantedWork > 0.0 && grantedSpeed > 0.0)
+ SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+ }
+ }
+
+ override fun onContextStarted(ctx: SimResourceContext) {
+ super.onContextStarted(ctx)
+
+ consumers.add(ctx)
+ }
+
+ override fun onContextFinished(ctx: SimResourceContext) {
+ super.onContextFinished(ctx)
+
+ consumers.remove(ctx)
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
new file mode 100644
index 00000000..f7f3fa4d
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer].
+ */
+public sealed class SimResourceCommand {
+ /**
+ * A request to the resource to perform the specified amount of work before the given [deadline].
+ *
+ * @param work The amount of work to process.
+ * @param limit The maximum amount of work to be processed per second.
+ * @param deadline The instant at which the work needs to be fulfilled.
+ */
+ public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() {
+ init {
+ require(work > 0) { "Amount of work must be positive" }
+ require(limit > 0) { "Limit must be positive" }
+ }
+ }
+
+ /**
+ * An indication to the resource that the consumer will idle until the specified [deadline] or if it is interrupted.
+ */
+ public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand()
+
+ /**
+ * An indication to the resource that the consumer has finished.
+ */
+ public object Exit : SimResourceCommand()
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
new file mode 100644
index 00000000..38672b13
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceConsumer] characterizes how a resource is consumed.
+ *
+ * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently)
+ * for multiple resource providers, unless explicitly said otherwise.
+ */
+public interface SimResourceConsumer {
+ /**
+ * This method is invoked when the consumer is started for some resource.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ */
+ public fun onStart(ctx: SimResourceContext) {}
+
+ /**
+ * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because
+ * the resource finished processing, reached its deadline or was interrupted.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @return The next command that the resource should execute.
+ */
+ public fun onNext(ctx: SimResourceContext): SimResourceCommand
+
+ /**
+ * This method is invoked when the resource provider confirms that the consumer is running at the given speed.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @param speed The speed at which the consumer runs.
+ */
+ public fun onConfirm(ctx: SimResourceContext, speed: Double) {}
+
+ /**
+ * This is method is invoked when the capacity of the resource changes.
+ *
+ * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the
+ * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly
+ * causing the active resource command to finish at a later moment than initially planned.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the
+ * capacity change.
+ */
+ public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {}
+
+ /**
+ * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit],
+ * the resource finished itself, or a failure occurred at the resource.
+ *
+ * Note that throwing an exception in [onStart] or [onNext] is undefined behavior and up to the resource provider.
+ *
+ * @param ctx The execution context in which the consumer ran.
+ * @param cause The cause of the finish in case the resource finished exceptionally.
+ */
+ public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {}
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
new file mode 100644
index 00000000..11dbb09f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a
+ * resource and a resource consumer.
+ */
+public interface SimResourceContext {
+ /**
+ * The virtual clock tracking simulation time.
+ */
+ public val clock: Clock
+
+ /**
+ * The resource capacity available at this instant.
+ */
+ public val capacity: Double
+
+ /**
+ * The amount of work still remaining at this instant.
+ */
+ public val remainingWork: Double
+
+ /**
+ * Ask the resource provider to interrupt its resource.
+ */
+ public fun interrupt()
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
new file mode 100644
index 00000000..b2759b7f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers.
+ */
+public interface SimResourceDistributor : AutoCloseable {
+ /**
+ * The output resource providers to which resource consumers can be attached.
+ */
+ public val outputs: Set<SimResourceProvider>
+
+ /**
+ * The input resource that will be distributed over the consumers.
+ */
+ public val input: SimResourceProvider
+
+ /**
+ * Add an output to the switch with the specified [capacity].
+ */
+ public fun addOutput(capacity: Double): SimResourceProvider
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
new file mode 100644
index 00000000..dfdd2c2e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -0,0 +1,420 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing.
+ */
+public class SimResourceDistributorMaxMin(
+ override val input: SimResourceProvider,
+ private val clock: Clock,
+ private val listener: Listener? = null
+) : SimResourceDistributor {
+ override val outputs: Set<SimResourceProvider>
+ get() = _outputs
+ private val _outputs = mutableSetOf<OutputProvider>()
+
+ /**
+ * The active output contexts.
+ */
+ private val outputContexts: MutableList<OutputContext> = mutableListOf()
+
+ /**
+ * The total speed requested by the output resources.
+ */
+ private var totalRequestedSpeed = 0.0
+
+ /**
+ * The total amount of work requested by the output resources.
+ */
+ private var totalRequestedWork = 0.0
+
+ /**
+ * The total allocated speed for the output resources.
+ */
+ private var totalAllocatedSpeed = 0.0
+
+ /**
+ * The total allocated work requested for the output resources.
+ */
+ private var totalAllocatedWork = 0.0
+
+ /**
+ * The amount of work that could not be performed due to over-committing resources.
+ */
+ private var totalOvercommittedWork = 0.0
+
+ /**
+ * The amount of work that was lost due to interference.
+ */
+ private var totalInterferedWork = 0.0
+
+ /**
+ * A flag to indicate that the switch is closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
+ * An internal [SimResourceConsumer] implementation for switch inputs.
+ */
+ private val consumer = object : SimResourceConsumer {
+ /**
+ * The resource context of the consumer.
+ */
+ private lateinit var ctx: SimResourceContext
+
+ val remainingWork: Double
+ get() = ctx.remainingWork
+
+ override fun onStart(ctx: SimResourceContext) {
+ this.ctx = ctx
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return doNext(ctx.capacity)
+ }
+
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ super.onFinish(ctx, cause)
+
+ val iterator = _outputs.iterator()
+ while (iterator.hasNext()) {
+ val output = iterator.next()
+
+ // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
+ // during the call to output.close()
+ iterator.remove()
+
+ output.close()
+ }
+ }
+ }
+
+ /**
+ * The total amount of remaining work.
+ */
+ private val totalRemainingWork: Double
+ get() = consumer.remainingWork
+
+ override fun addOutput(capacity: Double): SimResourceProvider {
+ check(!isClosed) { "Distributor has been closed" }
+
+ val provider = OutputProvider(capacity)
+ _outputs.add(provider)
+ return provider
+ }
+
+ override fun close() {
+ if (!isClosed) {
+ isClosed = true
+ input.cancel()
+ }
+ }
+
+ init {
+ input.startConsumer(consumer)
+ }
+
+ /**
+ * Indicate that the workloads should be re-scheduled.
+ */
+ private fun schedule() {
+ input.interrupt()
+ }
+
+ /**
+ * Schedule the work over the physical CPUs.
+ */
+ private fun doSchedule(capacity: Double): SimResourceCommand {
+ // If there is no work yet, mark all inputs as idle.
+ if (outputContexts.isEmpty()) {
+ return SimResourceCommand.Idle()
+ }
+
+ val maxUsage = capacity
+ var duration: Double = Double.MAX_VALUE
+ var deadline: Long = Long.MAX_VALUE
+ var availableSpeed = maxUsage
+ var totalRequestedSpeed = 0.0
+ var totalRequestedWork = 0.0
+
+ // Flush the work of the outputs
+ var outputIterator = outputContexts.listIterator()
+ while (outputIterator.hasNext()) {
+ val output = outputIterator.next()
+
+ output.flush(isIntermediate = true)
+
+ if (output.activeCommand == SimResourceCommand.Exit) {
+ // Apparently the output consumer has exited, so remove it from the scheduling queue.
+ outputIterator.remove()
+ }
+ }
+
+ // Sort the outputs based on their requested usage
+ // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
+ outputContexts.sort()
+
+ // Divide the available input capacity fairly across the outputs using max-min fair sharing
+ outputIterator = outputContexts.listIterator()
+ var remaining = outputContexts.size
+ while (outputIterator.hasNext()) {
+ val output = outputIterator.next()
+ val availableShare = availableSpeed / remaining--
+
+ when (val command = output.activeCommand) {
+ is SimResourceCommand.Idle -> {
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, command.deadline)
+
+ output.actualSpeed = 0.0
+ }
+ is SimResourceCommand.Consume -> {
+ val grantedSpeed = min(output.allowedSpeed, availableShare)
+
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, command.deadline)
+
+ // Ignore idle computation
+ if (grantedSpeed <= 0.0 || command.work <= 0.0) {
+ output.actualSpeed = 0.0
+ continue
+ }
+
+ totalRequestedSpeed += command.limit
+ totalRequestedWork += command.work
+
+ output.actualSpeed = grantedSpeed
+ availableSpeed -= grantedSpeed
+
+ // The duration that we want to run is that of the shortest request from an output
+ duration = min(duration, command.work / grantedSpeed)
+ }
+ SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" }
+ }
+ }
+
+ assert(deadline >= clock.millis()) { "Deadline already passed" }
+
+ this.totalRequestedSpeed = totalRequestedSpeed
+ this.totalRequestedWork = totalRequestedWork
+ this.totalAllocatedSpeed = maxUsage - availableSpeed
+ this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration)
+
+ return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0)
+ SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+ }
+
+ /**
+ * Obtain the next command to perform.
+ */
+ private fun doNext(capacity: Double): SimResourceCommand {
+ val totalRequestedWork = totalRequestedWork.toLong()
+ val totalRemainingWork = totalRemainingWork.toLong()
+ val totalAllocatedWork = totalAllocatedWork.toLong()
+ val totalRequestedSpeed = totalRequestedSpeed
+ val totalAllocatedSpeed = totalAllocatedSpeed
+
+ // Force all inputs to re-schedule their work.
+ val command = doSchedule(capacity)
+
+ // Report metrics
+ listener?.onSliceFinish(
+ this,
+ totalRequestedWork,
+ totalAllocatedWork - totalRemainingWork,
+ totalOvercommittedWork.toLong(),
+ totalInterferedWork.toLong(),
+ totalAllocatedSpeed,
+ totalRequestedSpeed
+ )
+
+ totalInterferedWork = 0.0
+ totalOvercommittedWork = 0.0
+
+ return command
+ }
+
+ /**
+ * Event listener for hypervisor events.
+ */
+ public interface Listener {
+ /**
+ * This method is invoked when a slice is finished.
+ */
+ public fun onSliceFinish(
+ switch: SimResourceDistributor,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ )
+ }
+
+ /**
+ * An internal [SimResourceProvider] implementation for switch outputs.
+ */
+ private inner class OutputProvider(val capacity: Double) : SimResourceProvider {
+ /**
+ * The [OutputContext] that is currently running.
+ */
+ private var ctx: OutputContext? = null
+
+ override var state: SimResourceState = SimResourceState.Pending
+ internal set
+
+ override fun startConsumer(consumer: SimResourceConsumer) {
+ check(state == SimResourceState.Pending) { "Resource cannot be consumed" }
+
+ val ctx = OutputContext(this, consumer)
+ this.ctx = ctx
+ this.state = SimResourceState.Active
+ outputContexts += ctx
+
+ ctx.start()
+ schedule()
+ }
+
+ override fun close() {
+ cancel()
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Stopped
+ _outputs.remove(this)
+ }
+ }
+
+ override fun interrupt() {
+ ctx?.interrupt()
+ }
+
+ override fun cancel() {
+ val ctx = ctx
+ if (ctx != null) {
+ this.ctx = null
+ ctx.stop()
+ }
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
+ }
+
+ /**
+ * A [SimAbstractResourceContext] for the output resources.
+ */
+ private inner class OutputContext(
+ private val provider: OutputProvider,
+ consumer: SimResourceConsumer
+ ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> {
+ /**
+ * The current command that is processed by the vCPU.
+ */
+ var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
+
+ /**
+ * The processing speed that is allowed by the model constraints.
+ */
+ var allowedSpeed: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ var actualSpeed: Double = 0.0
+
+ private fun reportOvercommit() {
+ val remainingWork = remainingWork
+ totalOvercommittedWork += remainingWork
+ }
+
+ override fun onIdle(deadline: Long) {
+ reportOvercommit()
+
+ allowedSpeed = 0.0
+ activeCommand = SimResourceCommand.Idle(deadline)
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ reportOvercommit()
+
+ allowedSpeed = speed
+ activeCommand = SimResourceCommand.Consume(work, limit, deadline)
+ }
+
+ override fun onFinish(cause: Throwable?) {
+ reportOvercommit()
+
+ activeCommand = SimResourceCommand.Exit
+ provider.cancel()
+
+ super.onFinish(cause)
+ }
+
+ override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
+ // Apply performance interference model
+ val performanceScore = 1.0
+
+ // Compute the remaining amount of work
+ return if (work > 0.0) {
+ // Compute the fraction of compute time allocated to the VM
+ val fraction = actualSpeed / totalAllocatedSpeed
+
+ // Compute the work that was actually granted to the VM.
+ val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
+ val processed = processingAvailable * performanceScore
+
+ val interferedWork = processingAvailable - processed
+
+ totalInterferedWork += interferedWork
+
+ max(0.0, work - processed)
+ } else {
+ 0.0
+ }
+ }
+
+ override fun interrupt() {
+ // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
+ // to infinite recursion.
+ if (isProcessing) {
+ return
+ }
+
+ super.interrupt()
+
+ // Force the scheduler to re-schedule
+ schedule()
+ }
+
+ override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed)
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlow.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlow.kt
new file mode 100644
index 00000000..bbf6ad44
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlow.kt
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceFlow] acts as both a resource consumer and resource provider at the same time, simplifying bridging
+ * between different components.
+ */
+public interface SimResourceFlow : SimResourceConsumer, SimResourceProvider
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
new file mode 100644
index 00000000..52b13c5c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.suspendCancellableCoroutine
+
+/**
+ * A [SimResourceProvider] provides some resource of type [R].
+ */
+public interface SimResourceProvider : AutoCloseable {
+ /**
+ * The state of the resource.
+ */
+ public val state: SimResourceState
+
+ /**
+ * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously.
+ *
+ * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended.
+ */
+ public fun startConsumer(consumer: SimResourceConsumer)
+
+ /**
+ * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op.
+ */
+ public fun interrupt()
+
+ /**
+ * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op.
+ */
+ public fun cancel()
+
+ /**
+ * End the lifetime of the resource.
+ *
+ * This operation terminates the existing resource consumer.
+ */
+ public override fun close()
+}
+
+/**
+ * Consume the resource provided by this provider using the specified [consumer] and suspend execution until
+ * the consumer has finished.
+ */
+public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) {
+ return suspendCancellableCoroutine { cont ->
+ startConsumer(object : SimResourceConsumer by consumer {
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ assert(!cont.isCompleted) { "Coroutine already completed" }
+
+ consumer.onFinish(ctx, cause)
+
+ cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit))
+ }
+
+ override fun toString(): String = "SimSuspendingResourceConsumer"
+ })
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
new file mode 100644
index 00000000..025b0406
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.opendc.utils.TimerScheduler
+import java.time.Clock
+import kotlin.math.ceil
+import kotlin.math.min
+
+/**
+ * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity.
+ *
+ * @param initialCapacity The initial capacity of the resource.
+ * @param clock The virtual clock to track simulation time.
+ * @param scheduler The scheduler to schedule the interrupts.
+ */
+public class SimResourceSource(
+ initialCapacity: Double,
+ private val clock: Clock,
+ private val scheduler: TimerScheduler<Any>
+) : SimResourceProvider {
+ /**
+ * The current processing speed of the resource.
+ */
+ public val speed: Double
+ get() = ctx?.speed ?: 0.0
+
+ /**
+ * The capacity of the resource.
+ */
+ public var capacity: Double = initialCapacity
+ set(value) {
+ field = value
+ ctx?.capacity = value
+ }
+
+ /**
+ * The [Context] that is currently running.
+ */
+ private var ctx: Context? = null
+
+ override var state: SimResourceState = SimResourceState.Pending
+ private set
+
+ override fun startConsumer(consumer: SimResourceConsumer) {
+ check(state == SimResourceState.Pending) { "Resource is in invalid state" }
+ val ctx = Context(consumer)
+
+ this.ctx = ctx
+ this.state = SimResourceState.Active
+
+ ctx.start()
+ }
+
+ override fun close() {
+ cancel()
+ state = SimResourceState.Stopped
+ }
+
+ override fun interrupt() {
+ ctx?.interrupt()
+ }
+
+ override fun cancel() {
+ val ctx = ctx
+ if (ctx != null) {
+ this.ctx = null
+ ctx.stop()
+ }
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
+
+ /**
+ * Internal implementation of [SimResourceContext] for this class.
+ */
+ private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) {
+ override fun onIdle(deadline: Long) {
+ // Do not resume if deadline is "infinite"
+ if (deadline != Long.MAX_VALUE) {
+ scheduler.startSingleTimerTo(this, deadline) { flush() }
+ }
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ val until = min(deadline, clock.millis() + getDuration(work, speed))
+
+ scheduler.startSingleTimerTo(this, until, ::flush)
+ }
+
+ override fun onFinish(cause: Throwable?) {
+ scheduler.cancel(this)
+ cancel()
+
+ super.onFinish(cause)
+ }
+
+ override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]"
+ }
+
+ /**
+ * Compute the duration that a resource consumption will take with the specified [speed].
+ */
+ private fun getDuration(work: Double, speed: Double): Long {
+ return ceil(work / speed * 1000).toLong()
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt
new file mode 100644
index 00000000..c72951d0
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * The state of a resource provider.
+ */
+public enum class SimResourceState {
+ /**
+ * The resource provider is pending and the resource is waiting to be consumed.
+ */
+ Pending,
+
+ /**
+ * The resource provider is active and the resource is currently being consumed.
+ */
+ Active,
+
+ /**
+ * The resource provider is stopped and the resource cannot be consumed anymore.
+ */
+ Stopped
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
new file mode 100644
index 00000000..53fec16a
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers.
+ */
+public interface SimResourceSwitch : AutoCloseable {
+ /**
+ * The output resource providers to which resource consumers can be attached.
+ */
+ public val outputs: Set<SimResourceProvider>
+
+ /**
+ * The input resources that will be switched between the output providers.
+ */
+ public val inputs: Set<SimResourceProvider>
+
+ /**
+ * Add an output to the switch with the specified [capacity].
+ */
+ public fun addOutput(capacity: Double): SimResourceProvider
+
+ /**
+ * Add the specified [input] to the switch.
+ */
+ public fun addInput(input: SimResourceProvider)
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
new file mode 100644
index 00000000..45e4c220
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.util.ArrayDeque
+
+/**
+ * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that
+ * a single output is directly connected to an input and that the switch can only support as much outputs as inputs.
+ */
+public class SimResourceSwitchExclusive : SimResourceSwitch {
+ /**
+ * A flag to indicate that the switch is closed.
+ */
+ private var isClosed: Boolean = false
+
+ private val _outputs = mutableSetOf<Provider>()
+ override val outputs: Set<SimResourceProvider>
+ get() = _outputs
+
+ private val availableResources = ArrayDeque<SimResourceTransformer>()
+
+ private val _inputs = mutableSetOf<SimResourceProvider>()
+ override val inputs: Set<SimResourceProvider>
+ get() = _inputs
+
+ override fun addOutput(capacity: Double): SimResourceProvider {
+ check(!isClosed) { "Switch has been closed" }
+ check(availableResources.isNotEmpty()) { "No capacity to serve request" }
+ val forwarder = availableResources.poll()
+ val output = Provider(capacity, forwarder)
+ _outputs += output
+ return output
+ }
+
+ override fun addInput(input: SimResourceProvider) {
+ check(!isClosed) { "Switch has been closed" }
+
+ if (input in inputs) {
+ return
+ }
+
+ val forwarder = SimResourceForwarder()
+
+ _inputs += input
+ availableResources += forwarder
+
+ input.startConsumer(object : SimResourceConsumer by forwarder {
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ // De-register the input after it has finished
+ _inputs -= input
+ forwarder.onFinish(ctx, cause)
+ }
+ })
+ }
+
+ override fun close() {
+ isClosed = true
+
+ // Cancel all upstream subscriptions
+ _inputs.forEach(SimResourceProvider::cancel)
+ }
+
+ private inner class Provider(
+ private val capacity: Double,
+ private val forwarder: SimResourceTransformer
+ ) : SimResourceProvider by forwarder {
+ override fun close() {
+ // We explicitly do not close the forwarder here in order to re-use it across output resources.
+
+ _outputs -= this
+ availableResources += forwarder
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
new file mode 100644
index 00000000..c796c251
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.*
+import java.time.Clock
+
+/**
+ * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
+ * fair sharing.
+ */
+public class SimResourceSwitchMaxMin(
+ clock: Clock,
+ private val listener: Listener? = null
+) : SimResourceSwitch {
+ private val _outputs = mutableSetOf<SimResourceProvider>()
+ override val outputs: Set<SimResourceProvider>
+ get() = _outputs
+
+ private val _inputs = mutableSetOf<SimResourceProvider>()
+ override val inputs: Set<SimResourceProvider>
+ get() = _inputs
+
+ /**
+ * A flag to indicate that the switch was closed.
+ */
+ private var isClosed = false
+
+ /**
+ * The aggregator to aggregate the resources.
+ */
+ private val aggregator = SimResourceAggregatorMaxMin(clock)
+
+ /**
+ * The distributor to distribute the aggregated resources.
+ */
+ private val distributor = SimResourceDistributorMaxMin(
+ aggregator.output, clock,
+ object : SimResourceDistributorMaxMin.Listener {
+ override fun onSliceFinish(
+ switch: SimResourceDistributor,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand)
+ }
+ }
+ )
+
+ /**
+ * Add an output to the switch represented by [resource].
+ */
+ override fun addOutput(capacity: Double): SimResourceProvider {
+ check(!isClosed) { "Switch has been closed" }
+
+ val provider = distributor.addOutput(capacity)
+ _outputs.add(provider)
+ return provider
+ }
+
+ /**
+ * Add the specified [input] to the switch.
+ */
+ override fun addInput(input: SimResourceProvider) {
+ check(!isClosed) { "Switch has been closed" }
+
+ aggregator.addInput(input)
+ }
+
+ override fun close() {
+ if (!isClosed) {
+ isClosed = true
+ distributor.close()
+ aggregator.close()
+ }
+ }
+
+ /**
+ * Event listener for hypervisor events.
+ */
+ public interface Listener {
+ /**
+ * This method is invoked when a slice is finished.
+ */
+ public fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
new file mode 100644
index 00000000..de455021
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -0,0 +1,175 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider.
+ *
+ * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits.
+ * @param transform The function to transform the received resource command.
+ */
+public class SimResourceTransformer(
+ private val isCoupled: Boolean = false,
+ private val transform: (SimResourceContext, SimResourceCommand) -> SimResourceCommand
+) : SimResourceFlow {
+ /**
+ * The [SimResourceContext] in which the forwarder runs.
+ */
+ private var ctx: SimResourceContext? = null
+
+ /**
+ * The delegate [SimResourceConsumer].
+ */
+ private var delegate: SimResourceConsumer? = null
+
+ /**
+ * A flag to indicate that the delegate was started.
+ */
+ private var hasDelegateStarted: Boolean = false
+
+ /**
+ * The state of the forwarder.
+ */
+ override var state: SimResourceState = SimResourceState.Pending
+ private set
+
+ override fun startConsumer(consumer: SimResourceConsumer) {
+ check(state == SimResourceState.Pending) { "Resource is in invalid state" }
+
+ state = SimResourceState.Active
+ delegate = consumer
+
+ // Interrupt the provider to replace the consumer
+ interrupt()
+ }
+
+ override fun interrupt() {
+ ctx?.interrupt()
+ }
+
+ override fun cancel() {
+ val delegate = delegate
+ val ctx = ctx
+
+ state = SimResourceState.Pending
+
+ if (delegate != null && ctx != null) {
+ this.delegate = null
+ delegate.onFinish(ctx)
+ }
+ }
+
+ override fun close() {
+ val ctx = ctx
+
+ state = SimResourceState.Stopped
+
+ if (ctx != null) {
+ this.ctx = null
+ ctx.interrupt()
+ }
+ }
+
+ override fun onStart(ctx: SimResourceContext) {
+ this.ctx = ctx
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ val delegate = delegate
+
+ if (!hasDelegateStarted) {
+ start()
+ }
+
+ return if (state == SimResourceState.Stopped) {
+ SimResourceCommand.Exit
+ } else if (delegate != null) {
+ val command = transform(ctx, delegate.onNext(ctx))
+ if (command == SimResourceCommand.Exit) {
+ // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
+ // reset beforehand the existing state and check whether it has been updated afterwards
+ reset()
+
+ delegate.onFinish(ctx)
+
+ if (isCoupled || state == SimResourceState.Stopped)
+ SimResourceCommand.Exit
+ else
+ onNext(ctx)
+ } else {
+ command
+ }
+ } else {
+ SimResourceCommand.Idle()
+ }
+ }
+
+ override fun onConfirm(ctx: SimResourceContext, speed: Double) {
+ delegate?.onConfirm(ctx, speed)
+ }
+
+ override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
+ delegate?.onCapacityChanged(ctx, isThrottled)
+ }
+
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ this.ctx = null
+
+ val delegate = delegate
+ if (delegate != null) {
+ reset()
+ delegate.onFinish(ctx, cause)
+ }
+ }
+
+ /**
+ * Start the delegate.
+ */
+ private fun start() {
+ val delegate = delegate ?: return
+ delegate.onStart(checkNotNull(ctx))
+
+ hasDelegateStarted = true
+ }
+
+ /**
+ * Reset the delegate.
+ */
+ private fun reset() {
+ delegate = null
+ hasDelegateStarted = false
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
+}
+
+/**
+ * Constructs a [SimResourceTransformer] that forwards the received resource command with an identity transform.
+ *
+ * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits.
+ */
+public fun SimResourceForwarder(isCoupled: Boolean = false): SimResourceTransformer {
+ return SimResourceTransformer(isCoupled) { _, command -> command }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
new file mode 100644
index 00000000..52a42241
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+/**
+ * The [SimConsumerBarrier] is a barrier that allows consumers to wait for a select number of other consumers to
+ * complete, before proceeding its operation.
+ */
+public class SimConsumerBarrier(public val parties: Int) {
+ private var counter = 0
+
+ /**
+ * Enter the barrier and determine whether the caller is the last to reach the barrier.
+ *
+ * @return `true` if the caller is the last to reach the barrier, `false` otherwise.
+ */
+ public fun enter(): Boolean {
+ val last = ++counter == parties
+ if (last) {
+ counter = 0
+ return true
+ }
+ return false
+ }
+
+ /**
+ * Reset the barrier.
+ */
+ public fun reset() {
+ counter = 0
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
new file mode 100644
index 00000000..114c7312
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+import kotlin.math.min
+
+/**
+ * Helper class to expose an observable [speed] field describing the speed of the consumer.
+ */
+public class SimSpeedConsumerAdapter(
+ private val delegate: SimResourceConsumer,
+ private val callback: (Double) -> Unit = {}
+) : SimResourceConsumer by delegate {
+ /**
+ * The resource processing speed at this instant.
+ */
+ public var speed: Double = 0.0
+ private set(value) {
+ if (field != value) {
+ callback(value)
+ field = value
+ }
+ }
+
+ init {
+ callback(0.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return delegate.onNext(ctx)
+ }
+
+ override fun onConfirm(ctx: SimResourceContext, speed: Double) {
+ delegate.onConfirm(ctx, speed)
+
+ this.speed = speed
+ }
+
+ override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
+ val oldSpeed = speed
+
+ delegate.onCapacityChanged(ctx, isThrottled)
+
+ // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
+ // need to update the current speed.
+ if (oldSpeed == speed) {
+ speed = min(ctx.capacity, speed)
+ }
+ }
+
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ super.onFinish(ctx, cause)
+
+ speed = 0.0
+ }
+
+ override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]"
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
new file mode 100644
index 00000000..a52d1d5d
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+
+/**
+ * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource
+ * consumption for some period of time.
+ */
+public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer {
+ private var iterator: Iterator<Fragment>? = null
+
+ override fun onStart(ctx: SimResourceContext) {
+ check(iterator == null) { "Consumer already running" }
+ iterator = trace.iterator()
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ val iterator = checkNotNull(iterator)
+ return if (iterator.hasNext()) {
+ val now = ctx.clock.millis()
+ val fragment = iterator.next()
+ val work = (fragment.duration / 1000) * fragment.usage
+ val deadline = now + fragment.duration
+
+ assert(deadline >= now) { "Deadline already passed" }
+
+ if (work > 0.0)
+ SimResourceCommand.Consume(work, fragment.usage, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ iterator = null
+ }
+
+ /**
+ * A fragment of the workload.
+ */
+ public data class Fragment(val duration: Long, val usage: Double)
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
new file mode 100644
index 00000000..faa693c4
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+
+/**
+ * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization.
+ */
+public class SimWorkConsumer(
+ private val work: Double,
+ private val utilization: Double
+) : SimResourceConsumer {
+
+ init {
+ require(work >= 0.0) { "Work must be positive" }
+ require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
+ }
+
+ private var isFirst = true
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ val limit = ctx.capacity * utilization
+ val work = if (isFirst) {
+ isFirst = false
+ work
+ } else {
+ ctx.remainingWork
+ }
+ return if (work > 0.0) {
+ SimResourceCommand.Consume(work, limit)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
new file mode 100644
index 00000000..e272abb8
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -0,0 +1,202 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.verify
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * Test suite for the [SimResourceAggregatorMaxMin] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceAggregatorMaxMinTest {
+ @Test
+ fun testSingleCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val forwarder = SimResourceForwarder()
+ val sources = listOf(
+ forwarder,
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(1.0, 0.5)
+ val usage = mutableListOf<Double>()
+ val source = SimResourceSource(1.0, clock, scheduler)
+ val adapter = SimSpeedConsumerAdapter(forwarder, usage::add)
+ source.startConsumer(adapter)
+
+ try {
+ aggregator.output.consume(consumer)
+ yield()
+
+ assertAll(
+ { assertEquals(1000, clock.millis()) },
+ { assertEquals(listOf(0.0, 0.5, 0.0), usage) }
+ )
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testDoubleCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(2.0, 1.0)
+ val usage = mutableListOf<Double>()
+ val adapter = SimSpeedConsumerAdapter(consumer, usage::add)
+
+ try {
+ aggregator.output.consume(adapter)
+ yield()
+ assertAll(
+ { assertEquals(1000, clock.millis()) },
+ { assertEquals(listOf(0.0, 2.0, 0.0), usage) }
+ )
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testOvercommit() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(4.0, 4.0, 1000))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ aggregator.output.consume(consumer)
+ yield()
+ assertEquals(1000, clock.millis())
+
+ verify(exactly = 2) { consumer.onNext(any()) }
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testException() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> { aggregator.output.consume(consumer) }
+ yield()
+ assertEquals(SimResourceState.Pending, sources[0].state)
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(4.0, 1.0)
+ try {
+ coroutineScope {
+ launch { aggregator.output.consume(consumer) }
+ delay(1000)
+ sources[0].capacity = 0.5
+ }
+ yield()
+ assertEquals(2334, clock.millis())
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testFailOverCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(1.0, 0.5)
+ try {
+ coroutineScope {
+ launch { aggregator.output.consume(consumer) }
+ delay(500)
+ sources[0].capacity = 0.5
+ }
+ yield()
+ assertEquals(1000, clock.millis())
+ } finally {
+ aggregator.output.close()
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
new file mode 100644
index 00000000..02d456ff
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.junit.jupiter.api.assertThrows
+
+/**
+ * Test suite for [SimResourceCommand].
+ */
+class SimResourceCommandTest {
+ @Test
+ fun testZeroWork() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(0.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testNegativeWork() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(-1.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testZeroLimit() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(1.0, 0.0)
+ }
+ }
+
+ @Test
+ fun testNegativeLimit() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(1.0, -1.0, 1)
+ }
+ }
+
+ @Test
+ fun testConsumeCorrect() {
+ assertDoesNotThrow {
+ SimResourceCommand.Consume(1.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testIdleCorrect() {
+ assertDoesNotThrow {
+ SimResourceCommand.Idle(1)
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
new file mode 100644
index 00000000..be909556
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.*
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.*
+import org.opendc.simulator.core.runBlockingSimulation
+
+/**
+ * A test suite for the [SimAbstractResourceContext] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+class SimResourceContextTest {
+ @Test
+ fun testFlushWithoutCommand() = runBlockingSimulation {
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
+
+ val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ }
+
+ context.flush()
+ }
+
+ @Test
+ fun testIntermediateFlush() = runBlockingSimulation {
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
+
+ val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ })
+
+ context.start()
+ delay(1) // Delay 1 ms to prevent hitting the fast path
+ context.flush(isIntermediate = true)
+
+ verify(exactly = 2) { context.onConsume(any(), any(), any()) }
+ }
+
+ @Test
+ fun testIntermediateFlushIdle() = runBlockingSimulation {
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
+
+ val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ })
+
+ context.start()
+ delay(5)
+ context.flush(isIntermediate = true)
+ delay(5)
+ context.flush(isIntermediate = true)
+
+ assertAll(
+ { verify(exactly = 2) { context.onIdle(any()) } },
+ { verify(exactly = 1) { context.onFinish(null) } }
+ )
+ }
+
+ @Test
+ fun testDoubleStart() = runBlockingSimulation {
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
+
+ val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ }
+
+ context.start()
+ assertThrows<IllegalStateException> { context.start() }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
new file mode 100644
index 00000000..39f74481
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -0,0 +1,351 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import io.mockk.verify
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimResourceSource] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+class SimResourceSourceTest {
+ @Test
+ fun testSpeed() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1000 * capacity, capacity))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ val res = mutableListOf<Double>()
+ val adapter = SimSpeedConsumerAdapter(consumer, res::add)
+
+ provider.consume(adapter)
+
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+
+ try {
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ delay(1000)
+ provider.capacity = 0.5
+ }
+ assertEquals(3000, clock.millis())
+ verify(exactly = 1) { consumer.onCapacityChanged(any(), true) }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testSpeedLimit() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1000 * capacity, 2 * capacity))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ val res = mutableListOf<Double>()
+ val adapter = SimSpeedConsumerAdapter(consumer, res::add)
+
+ provider.consume(adapter)
+
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ /**
+ * Test to see whether no infinite recursion occurs when interrupting during [SimResourceConsumer.onStart] or
+ * [SimResourceConsumer.onNext].
+ */
+ @Test
+ fun testIntermediateInterrupt() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = object : SimResourceConsumer {
+ override fun onStart(ctx: SimResourceContext) {
+ ctx.interrupt()
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ try {
+ provider.consume(consumer)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testInterrupt() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+ lateinit var resCtx: SimResourceContext
+
+ val consumer = object : SimResourceConsumer {
+ var isFirst = true
+ override fun onStart(ctx: SimResourceContext) {
+ resCtx = ctx
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ assertEquals(0.0, ctx.remainingWork)
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(4.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+ }
+
+ try {
+ launch {
+ yield()
+ resCtx.interrupt()
+ }
+ provider.consume(consumer)
+
+ assertEquals(0, clock.millis())
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testFailure() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onStart(any()) }
+ .throws(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testExceptionPropagationOnNext() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testConcurrentConsumption() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> {
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ provider.consume(consumer)
+ }
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testClosedConsumption() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> {
+ provider.close()
+ provider.consume(consumer)
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testCloseDuringConsumption() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ launch { provider.consume(consumer) }
+ delay(500)
+ provider.close()
+
+ assertEquals(500, clock.millis())
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testIdle() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Idle(clock.millis() + 500))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ provider.consume(consumer)
+
+ assertEquals(500, clock.millis())
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testInfiniteSleep() {
+ assertThrows<IllegalStateException> {
+ runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Idle())
+ .andThenThrows(IllegalStateException())
+
+ try {
+ provider.consume(consumer)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+ }
+ }
+
+ @Test
+ fun testIncorrectDeadline() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Idle(2))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ delay(10)
+
+ assertThrows<IllegalArgumentException> { provider.consume(consumer) }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
new file mode 100644
index 00000000..f7d17867
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -0,0 +1,173 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * Test suite for the [SimResourceSwitchExclusive] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceSwitchExclusiveTest {
+ /**
+ * Test a trace workload.
+ */
+ @Test
+ fun testTrace() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val speed = mutableListOf<Double>()
+
+ val duration = 5 * 60L
+ val workload =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
+ val forwarder = SimResourceForwarder()
+ val adapter = SimSpeedConsumerAdapter(forwarder, speed::add)
+ source.startConsumer(adapter)
+ switch.addInput(forwarder)
+
+ val provider = switch.addOutput(3200.0)
+
+ try {
+ provider.consume(workload)
+ yield()
+ } finally {
+ provider.close()
+ }
+
+ assertAll(
+ { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } },
+ { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } }
+ )
+ }
+
+ /**
+ * Test runtime workload on hypervisor.
+ */
+ @Test
+ fun testRuntimeWorkload() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
+
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
+
+ switch.addInput(source)
+
+ val provider = switch.addOutput(3200.0)
+
+ try {
+ provider.consume(workload)
+ yield()
+ } finally {
+ provider.close()
+ }
+ assertEquals(duration, clock.millis()) { "Took enough time" }
+ }
+
+ /**
+ * Test two workloads running sequentially.
+ */
+ @Test
+ fun testTwoWorkloads() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = object : SimResourceConsumer {
+ var isFirst = true
+
+ override fun onStart(ctx: SimResourceContext) {
+ isFirst = true
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(duration / 1000.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+ }
+
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
+
+ switch.addInput(source)
+
+ val provider = switch.addOutput(3200.0)
+
+ try {
+ provider.consume(workload)
+ yield()
+ provider.consume(workload)
+ } finally {
+ provider.close()
+ }
+ assertEquals(duration * 2, clock.millis()) { "Took enough time" }
+ }
+
+ /**
+ * Test concurrent workloads on the machine.
+ */
+ @Test
+ fun testConcurrentWorkloadFails() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
+
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
+
+ switch.addInput(source)
+
+ switch.addOutput(3200.0)
+ assertThrows<IllegalStateException> { switch.addOutput(3200.0) }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
new file mode 100644
index 00000000..7416f277
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -0,0 +1,193 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * Test suite for the [SimResourceSwitch] implementations
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceSwitchMaxMinTest {
+ @Test
+ fun testSmoke() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val switch = SimResourceSwitchMaxMin(clock)
+
+ val sources = List(2) { SimResourceSource(2000.0, clock, scheduler) }
+ sources.forEach { switch.addInput(it) }
+
+ val provider = switch.addOutput(1000.0)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit
+
+ try {
+ provider.consume(consumer)
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with a single VM.
+ */
+ @Test
+ fun testOvercommittedSingle() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val listener = object : SimResourceSwitchMaxMin.Listener {
+ var totalRequestedWork = 0L
+ var totalGrantedWork = 0L
+ var totalOvercommittedWork = 0L
+
+ override fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ totalRequestedWork += requestedWork
+ totalGrantedWork += grantedWork
+ totalOvercommittedWork += overcommittedWork
+ }
+ }
+
+ val duration = 5 * 60L
+ val workload =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = SimResourceSwitchMaxMin(clock, listener)
+ val provider = switch.addOutput(3200.0)
+
+ try {
+ switch.addInput(SimResourceSource(3200.0, clock, scheduler))
+ provider.consume(workload)
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+
+ assertAll(
+ { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1200000, clock.millis()) }
+ )
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with two VMs.
+ */
+ @Test
+ fun testOvercommittedDual() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val listener = object : SimResourceSwitchMaxMin.Listener {
+ var totalRequestedWork = 0L
+ var totalGrantedWork = 0L
+ var totalOvercommittedWork = 0L
+
+ override fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ totalRequestedWork += requestedWork
+ totalGrantedWork += grantedWork
+ totalOvercommittedWork += overcommittedWork
+ }
+ }
+
+ val duration = 5 * 60L
+ val workloadA =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+ val workloadB =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3100.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 73.0)
+ )
+ )
+
+ val switch = SimResourceSwitchMaxMin(clock, listener)
+ val providerA = switch.addOutput(3200.0)
+ val providerB = switch.addOutput(3200.0)
+
+ try {
+ switch.addInput(SimResourceSource(3200.0, clock, scheduler))
+
+ coroutineScope {
+ launch { providerA.consume(workloadA) }
+ providerB.consume(workloadB)
+ }
+
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+ assertAll(
+ { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1200000, clock.millis()) }
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
new file mode 100644
index 00000000..d2ad73bc
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -0,0 +1,210 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import io.mockk.verify
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimResourceTransformer] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceTransformerTest {
+ @Test
+ fun testExitImmediately() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ launch {
+ source.consume(forwarder)
+ source.close()
+ }
+
+ forwarder.consume(object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ })
+
+ forwarder.close()
+ scheduler.close()
+ }
+
+ @Test
+ fun testExit() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ launch {
+ source.consume(forwarder)
+ source.close()
+ }
+
+ forwarder.consume(object : SimResourceConsumer {
+ var isFirst = true
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(10.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+ })
+
+ forwarder.close()
+ }
+
+ @Test
+ fun testState() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val consumer = object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit
+ }
+
+ assertEquals(SimResourceState.Pending, forwarder.state)
+
+ forwarder.startConsumer(consumer)
+ assertEquals(SimResourceState.Active, forwarder.state)
+
+ assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) }
+
+ forwarder.cancel()
+ assertEquals(SimResourceState.Pending, forwarder.state)
+
+ forwarder.close()
+ assertEquals(SimResourceState.Stopped, forwarder.state)
+ }
+
+ @Test
+ fun testCancelPendingDelegate() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Exit
+
+ forwarder.startConsumer(consumer)
+ forwarder.cancel()
+
+ verify(exactly = 0) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testCancelStartedDelegate() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ forwarder.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any()) }
+ verify(exactly = 1) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testCancelPropagation() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ source.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any()) }
+ verify(exactly = 1) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testExitPropagation() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder(isCoupled = true)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Exit
+
+ source.startConsumer(forwarder)
+ forwarder.consume(consumer)
+ yield()
+
+ assertEquals(SimResourceState.Pending, source.state)
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+ source.startConsumer(forwarder)
+
+ coroutineScope {
+ launch { forwarder.consume(consumer) }
+ delay(1000)
+ source.capacity = 0.5
+ }
+
+ assertEquals(3000, clock.millis())
+ verify(exactly = 1) { consumer.onCapacityChanged(any(), true) }
+ }
+
+ @Test
+ fun testTransformExit() = runBlockingSimulation {
+ val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit }
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+ source.startConsumer(forwarder)
+ forwarder.consume(consumer)
+
+ assertEquals(0, clock.millis())
+ verify(exactly = 1) { consumer.onNext(any()) }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
new file mode 100644
index 00000000..bf58b1b6
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimWorkConsumer] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimWorkConsumerTest {
+ @Test
+ fun testSmoke() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = SimWorkConsumer(1.0, 1.0)
+
+ try {
+ provider.consume(consumer)
+ assertEquals(1000, clock.millis())
+ } finally {
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testUtilization() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = SimWorkConsumer(1.0, 0.5)
+
+ try {
+ provider.consume(consumer)
+ assertEquals(2000, clock.millis())
+ } finally {
+ provider.close()
+ }
+ }
+}