diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources/src')
31 files changed, 3835 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt new file mode 100644 index 00000000..8d2587b1 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.opendc.simulator.resources.consumer.SimTraceConsumer + +/** + * Helper function to create simple consumer workload. + */ +fun createSimpleConsumer(): SimResourceConsumer { + return SimTraceConsumer( + sequenceOf( + SimTraceConsumer.Fragment(1000, 28.0), + SimTraceConsumer.Fragment(1000, 3500.0), + SimTraceConsumer.Fragment(1000, 0.0), + SimTraceConsumer.Fragment(1000, 183.0), + SimTraceConsumer.Fragment(1000, 400.0), + SimTraceConsumer.Fragment(1000, 100.0), + SimTraceConsumer.Fragment(1000, 3000.0), + SimTraceConsumer.Fragment(1000, 4500.0), + ), + ) +} diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt new file mode 100644 index 00000000..beda3eaa --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.launch +import org.opendc.simulator.core.SimulationCoroutineScope +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.utils.TimerScheduler +import org.openjdk.jmh.annotations.* +import java.util.concurrent.TimeUnit + +@State(Scope.Thread) +@Fork(1) +@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +@OptIn(ExperimentalCoroutinesApi::class) +class SimResourceBenchmarks { + private lateinit var scope: SimulationCoroutineScope + private lateinit var scheduler: TimerScheduler<Any> + + @Setup + fun setUp() { + scope = SimulationCoroutineScope() + scheduler = TimerScheduler(scope.coroutineContext, scope.clock) + } + + @State(Scope.Thread) + class Workload { + lateinit var consumers: Array<SimResourceConsumer> + + @Setup + fun setUp() { + consumers = Array(3) { createSimpleConsumer() } + } + } + + @Benchmark + fun benchmarkSource(state: Workload) { + return scope.runBlockingSimulation { + val provider = SimResourceSource(4200.0, clock, scheduler) + return@runBlockingSimulation provider.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkForwardOverhead(state: Workload) { + return scope.runBlockingSimulation { + val provider = SimResourceSource(4200.0, clock, scheduler) + val forwarder = SimResourceForwarder() + provider.startConsumer(forwarder) + return@runBlockingSimulation forwarder.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) { + return scope.runBlockingSimulation { + val switch = SimResourceSwitchMaxMin(clock) + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + val provider = switch.addOutput(3500.0) + return@runBlockingSimulation provider.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) { + return scope.runBlockingSimulation { + val switch = SimResourceSwitchMaxMin(clock) + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + repeat(3) { i -> + launch { + val provider = switch.addOutput(3500.0) + provider.consume(state.consumers[i]) + } + } + } + } + + @Benchmark + fun benchmarkSwitchExclusiveSingleConsumer(state: Workload) { + return scope.runBlockingSimulation { + val switch = SimResourceSwitchExclusive() + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + val provider = switch.addOutput(3500.0) + return@runBlockingSimulation provider.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkSwitchExclusiveTripleConsumer(state: Workload) { + return scope.runBlockingSimulation { + val switch = SimResourceSwitchExclusive() + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + repeat(2) { i -> + launch { + val provider = switch.addOutput(3500.0) + provider.consume(state.consumers[i]) + } + } + } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt new file mode 100644 index 00000000..c7fa6a17 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * Abstract implementation of [SimResourceAggregator]. + */ +public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator { + /** + * The available resource provider contexts. + */ + protected val inputContexts: Set<SimResourceContext> + get() = _inputContexts + private val _inputContexts = mutableSetOf<SimResourceContext>() + + /** + * The output context. + */ + protected val outputContext: SimResourceContext + get() = context + + /** + * The commands to submit to the underlying input resources. + */ + protected val commands: MutableMap<SimResourceContext, SimResourceCommand> = mutableMapOf() + + /** + * This method is invoked when the resource consumer consumes resources. + */ + protected abstract fun doConsume(work: Double, limit: Double, deadline: Long) + + /** + * This method is invoked when the resource consumer enters an idle state. + */ + protected open fun doIdle(deadline: Long) { + for (input in inputContexts) { + commands[input] = SimResourceCommand.Idle(deadline) + } + } + + /** + * This method is invoked when the resource consumer finishes processing. + */ + protected open fun doFinish(cause: Throwable?) { + for (input in inputContexts) { + commands[input] = SimResourceCommand.Exit + } + } + + /** + * This method is invoked when an input context is started. + */ + protected open fun onContextStarted(ctx: SimResourceContext) { + _inputContexts.add(ctx) + } + + protected open fun onContextFinished(ctx: SimResourceContext) { + assert(_inputContexts.remove(ctx)) { "Lost context" } + } + + override fun addInput(input: SimResourceProvider) { + check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" } + + val consumer = Consumer() + _inputs.add(input) + input.startConsumer(consumer) + } + + override fun close() { + output.close() + } + + override val output: SimResourceProvider + get() = _output + private val _output = SimResourceForwarder() + + override val inputs: Set<SimResourceProvider> + get() = _inputs + private val _inputs = mutableSetOf<SimResourceProvider>() + + private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) { + override val remainingWork: Double + get() { + val now = clock.millis() + + return if (_remainingWorkFlush < now) { + _remainingWorkFlush = now + _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it } + } else { + _remainingWork + } + } + private var _remainingWork: Double = 0.0 + private var _remainingWorkFlush: Long = Long.MIN_VALUE + + override fun interrupt() { + super.interrupt() + + interruptAll() + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline) + + override fun onIdle(deadline: Long) = doIdle(deadline) + + override fun onFinish(cause: Throwable?) { + doFinish(cause) + + super.onFinish(cause) + + interruptAll() + } + } + + /** + * A flag to indicate that an interrupt is active. + */ + private var isInterrupting: Boolean = false + + /** + * Schedule the work over the input resources. + */ + private fun doSchedule() { + context.flush(isIntermediate = true) + interruptAll() + } + + /** + * Interrupt all inputs. + */ + private fun interruptAll() { + // Prevent users from interrupting the resource while they are constructing their next command, as this will + // only lead to infinite recursion. + if (isInterrupting) { + return + } + + try { + isInterrupting = true + + val iterator = _inputs.iterator() + while (iterator.hasNext()) { + val input = iterator.next() + input.interrupt() + + if (input.state != SimResourceState.Active) { + iterator.remove() + } + } + } finally { + isInterrupting = false + } + } + + /** + * An internal [SimResourceConsumer] implementation for aggregator inputs. + */ + private inner class Consumer : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext) { + onContextStarted(ctx) + onCapacityChanged(ctx, false) + + // Make sure we initialize the output if we have not done so yet + if (context.state == SimResourceState.Pending) { + context.start() + } + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + doSchedule() + + return commands[ctx] ?: SimResourceCommand.Idle() + } + + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + // Adjust capacity of output resource + context.capacity = inputContexts.sumByDouble { it.capacity } + } + + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + onContextFinished(ctx) + + super.onFinish(ctx, cause) + } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt new file mode 100644 index 00000000..05ed0714 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -0,0 +1,344 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock +import kotlin.math.max +import kotlin.math.min + +/** + * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. + */ +public abstract class SimAbstractResourceContext( + initialCapacity: Double, + override val clock: Clock, + private val consumer: SimResourceConsumer +) : SimResourceContext { + /** + * The capacity of the resource. + */ + public final override var capacity: Double = initialCapacity + set(value) { + val oldValue = field + + // Only changes will be propagated + if (value != oldValue) { + field = value + onCapacityChange() + } + } + + /** + * The amount of work still remaining at this instant. + */ + override val remainingWork: Double + get() { + val activeCommand = activeCommand ?: return 0.0 + val now = clock.millis() + + return if (_remainingWorkFlush < now) { + _remainingWorkFlush = now + computeRemainingWork(activeCommand, now).also { _remainingWork = it } + } else { + _remainingWork + } + } + private var _remainingWork: Double = 0.0 + private var _remainingWorkFlush: Long = Long.MIN_VALUE + + /** + * A flag to indicate the state of the context. + */ + public var state: SimResourceState = SimResourceState.Pending + private set + + /** + * The current processing speed of the resource. + */ + public var speed: Double = 0.0 + private set + + /** + * This method is invoked when the resource will idle until the specified [deadline]. + */ + public abstract fun onIdle(deadline: Long) + + /** + * This method is invoked when the resource will be consumed until the specified [work] was processed or the + * [deadline] was reached. + */ + public abstract fun onConsume(work: Double, limit: Double, deadline: Long) + + /** + * This method is invoked when the resource consumer has finished. + */ + public open fun onFinish(cause: Throwable?) { + consumer.onFinish(this, cause) + } + + /** + * Get the remaining work to process after a resource consumption. + * + * @param work The size of the resource consumption. + * @param speed The speed of consumption. + * @param duration The duration from the start of the consumption until now. + * @return The amount of work remaining. + */ + protected open fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { + return if (duration > 0L) { + val processed = duration / 1000.0 * speed + max(0.0, work - processed) + } else { + 0.0 + } + } + + /** + * Start the consumer. + */ + public fun start() { + check(state == SimResourceState.Pending) { "Consumer is already started" } + + val now = clock.millis() + + state = SimResourceState.Active + isProcessing = true + latestFlush = now + + try { + consumer.onStart(this) + activeCommand = interpret(consumer.onNext(this), now) + } catch (cause: Throwable) { + doStop(cause) + } finally { + isProcessing = false + } + } + + /** + * Immediately stop the consumer. + */ + public fun stop() { + try { + isProcessing = true + latestFlush = clock.millis() + + flush(isIntermediate = true) + doStop(null) + } catch (cause: Throwable) { + doStop(cause) + } finally { + isProcessing = false + } + } + + /** + * Flush the current active resource consumption. + * + * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be + * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer + * will be asked to deliver a new command and is essentially interrupted. + */ + public fun flush(isIntermediate: Boolean = false) { + // Flush is no-op when the consumer is finished or not yet started + if (state != SimResourceState.Active) { + return + } + + val now = clock.millis() + + // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it. + if (isIntermediate && latestFlush >= now) { + return + } + + try { + val activeCommand = activeCommand ?: return + val (timestamp, command) = activeCommand + + // Note: accessor is reliant on activeCommand being set + val remainingWork = remainingWork + + isProcessing = true + + val duration = now - timestamp + assert(duration >= 0) { "Flush in the past" } + + this.activeCommand = when (command) { + is SimResourceCommand.Idle -> { + // We should only continue processing the next command if: + // 1. The resource consumer reached its deadline. + // 2. The resource consumer should be interrupted (e.g., someone called .interrupt()) + if (command.deadline <= now || !isIntermediate) { + next(now) + } else { + interpret(SimResourceCommand.Idle(command.deadline), now) + } + } + is SimResourceCommand.Consume -> { + // We should only continue processing the next command if: + // 1. The resource consumption was finished. + // 2. The resource capacity cannot satisfy the demand. + // 4. The resource consumer should be interrupted (e.g., someone called .interrupt()) + if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { + next(now) + } else { + interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline), now) + } + } + SimResourceCommand.Exit -> + // Flush may not be called when the resource consumer has finished + throw IllegalStateException() + } + + // Flush remaining work cache + _remainingWorkFlush = Long.MIN_VALUE + } catch (cause: Throwable) { + doStop(cause) + } finally { + latestFlush = now + isProcessing = false + } + } + + override fun interrupt() { + // Prevent users from interrupting the resource while they are constructing their next command, as this will + // only lead to infinite recursion. + if (isProcessing) { + return + } + + flush() + } + + override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]" + + /** + * A flag to indicate that the resource is currently processing a command. + */ + protected var isProcessing: Boolean = false + + /** + * The current command that is being processed. + */ + private var activeCommand: CommandWrapper? = null + + /** + * The latest timestamp at which the resource was flushed. + */ + private var latestFlush: Long = Long.MIN_VALUE + + /** + * Finish the consumer and resource provider. + */ + private fun doStop(cause: Throwable?) { + val state = state + this.state = SimResourceState.Stopped + + if (state == SimResourceState.Active) { + activeCommand = null + onFinish(cause) + } + } + + /** + * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. + */ + private fun interpret(command: SimResourceCommand, now: Long): CommandWrapper? { + when (command) { + is SimResourceCommand.Idle -> { + val deadline = command.deadline + + require(deadline >= now) { "Deadline already passed" } + + speed = 0.0 + consumer.onConfirm(this, 0.0) + + onIdle(deadline) + } + is SimResourceCommand.Consume -> { + val work = command.work + val limit = command.limit + val deadline = command.deadline + + require(deadline >= now) { "Deadline already passed" } + + speed = min(capacity, limit) + consumer.onConfirm(this, speed) + + onConsume(work, limit, deadline) + } + is SimResourceCommand.Exit -> { + speed = 0.0 + + doStop(null) + + // No need to set the next active command + return null + } + } + + return CommandWrapper(now, command) + } + + /** + * Request the workload for more work. + */ + private fun next(now: Long): CommandWrapper? = interpret(consumer.onNext(this), now) + + /** + * Compute the remaining work based on the specified [wrapper] and [timestamp][now]. + */ + private fun computeRemainingWork(wrapper: CommandWrapper, now: Long): Double { + val (timestamp, command) = wrapper + val duration = now - timestamp + return when (command) { + is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration) + is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0 + } + } + + /** + * Indicate that the capacity of the resource has changed. + */ + private fun onCapacityChange() { + // Do not inform the consumer if it has not been started yet + if (state != SimResourceState.Active) { + return + } + + val isThrottled = speed > capacity + consumer.onCapacityChanged(this, isThrottled) + + // Optimization: only flush changes if the new capacity cannot satisfy the active resource command. + // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush(). + if (isThrottled) { + flush(isIntermediate = true) + } + } + + /** + * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. + */ + private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand) +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt new file mode 100644 index 00000000..bb4e6a2c --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource. + */ +public interface SimResourceAggregator : AutoCloseable { + /** + * The output resource provider to which resource consumers can be attached. + */ + public val output: SimResourceProvider + + /** + * The input resources that will be switched between the output providers. + */ + public val inputs: Set<SimResourceProvider> + + /** + * Add the specified [input] to the switch. + */ + public fun addInput(input: SimResourceProvider) + + /** + * End the lifecycle of the aggregator. + */ + public override fun close() +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt new file mode 100644 index 00000000..08bc064e --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * A [SimResourceAggregator] that distributes the load equally across the input resources. + */ +public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) { + private val consumers = mutableListOf<SimResourceContext>() + + override fun doConsume(work: Double, limit: Double, deadline: Long) { + // Sort all consumers by their capacity + consumers.sortWith(compareBy { it.capacity }) + + // Divide the requests over the available capacity of the input resources fairly + for (input in consumers) { + val inputCapacity = input.capacity + val fraction = inputCapacity / outputContext.capacity + val grantedSpeed = limit * fraction + val grantedWork = fraction * work + + commands[input] = + if (grantedWork > 0.0 && grantedSpeed > 0.0) + SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + } + } + + override fun onContextStarted(ctx: SimResourceContext) { + super.onContextStarted(ctx) + + consumers.add(ctx) + } + + override fun onContextFinished(ctx: SimResourceContext) { + super.onContextFinished(ctx) + + consumers.remove(ctx) + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt new file mode 100644 index 00000000..f7f3fa4d --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer]. + */ +public sealed class SimResourceCommand { + /** + * A request to the resource to perform the specified amount of work before the given [deadline]. + * + * @param work The amount of work to process. + * @param limit The maximum amount of work to be processed per second. + * @param deadline The instant at which the work needs to be fulfilled. + */ + public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() { + init { + require(work > 0) { "Amount of work must be positive" } + require(limit > 0) { "Limit must be positive" } + } + } + + /** + * An indication to the resource that the consumer will idle until the specified [deadline] or if it is interrupted. + */ + public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() + + /** + * An indication to the resource that the consumer has finished. + */ + public object Exit : SimResourceCommand() +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt new file mode 100644 index 00000000..38672b13 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceConsumer] characterizes how a resource is consumed. + * + * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently) + * for multiple resource providers, unless explicitly said otherwise. + */ +public interface SimResourceConsumer { + /** + * This method is invoked when the consumer is started for some resource. + * + * @param ctx The execution context in which the consumer runs. + */ + public fun onStart(ctx: SimResourceContext) {} + + /** + * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because + * the resource finished processing, reached its deadline or was interrupted. + * + * @param ctx The execution context in which the consumer runs. + * @return The next command that the resource should execute. + */ + public fun onNext(ctx: SimResourceContext): SimResourceCommand + + /** + * This method is invoked when the resource provider confirms that the consumer is running at the given speed. + * + * @param ctx The execution context in which the consumer runs. + * @param speed The speed at which the consumer runs. + */ + public fun onConfirm(ctx: SimResourceContext, speed: Double) {} + + /** + * This is method is invoked when the capacity of the resource changes. + * + * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the + * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly + * causing the active resource command to finish at a later moment than initially planned. + * + * @param ctx The execution context in which the consumer runs. + * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the + * capacity change. + */ + public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {} + + /** + * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], + * the resource finished itself, or a failure occurred at the resource. + * + * Note that throwing an exception in [onStart] or [onNext] is undefined behavior and up to the resource provider. + * + * @param ctx The execution context in which the consumer ran. + * @param cause The cause of the finish in case the resource finished exceptionally. + */ + public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {} +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt new file mode 100644 index 00000000..11dbb09f --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a + * resource and a resource consumer. + */ +public interface SimResourceContext { + /** + * The virtual clock tracking simulation time. + */ + public val clock: Clock + + /** + * The resource capacity available at this instant. + */ + public val capacity: Double + + /** + * The amount of work still remaining at this instant. + */ + public val remainingWork: Double + + /** + * Ask the resource provider to interrupt its resource. + */ + public fun interrupt() +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt new file mode 100644 index 00000000..b2759b7f --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers. + */ +public interface SimResourceDistributor : AutoCloseable { + /** + * The output resource providers to which resource consumers can be attached. + */ + public val outputs: Set<SimResourceProvider> + + /** + * The input resource that will be distributed over the consumers. + */ + public val input: SimResourceProvider + + /** + * Add an output to the switch with the specified [capacity]. + */ + public fun addOutput(capacity: Double): SimResourceProvider +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt new file mode 100644 index 00000000..dfdd2c2e --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -0,0 +1,420 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock +import kotlin.math.max +import kotlin.math.min + +/** + * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing. + */ +public class SimResourceDistributorMaxMin( + override val input: SimResourceProvider, + private val clock: Clock, + private val listener: Listener? = null +) : SimResourceDistributor { + override val outputs: Set<SimResourceProvider> + get() = _outputs + private val _outputs = mutableSetOf<OutputProvider>() + + /** + * The active output contexts. + */ + private val outputContexts: MutableList<OutputContext> = mutableListOf() + + /** + * The total speed requested by the output resources. + */ + private var totalRequestedSpeed = 0.0 + + /** + * The total amount of work requested by the output resources. + */ + private var totalRequestedWork = 0.0 + + /** + * The total allocated speed for the output resources. + */ + private var totalAllocatedSpeed = 0.0 + + /** + * The total allocated work requested for the output resources. + */ + private var totalAllocatedWork = 0.0 + + /** + * The amount of work that could not be performed due to over-committing resources. + */ + private var totalOvercommittedWork = 0.0 + + /** + * The amount of work that was lost due to interference. + */ + private var totalInterferedWork = 0.0 + + /** + * A flag to indicate that the switch is closed. + */ + private var isClosed: Boolean = false + + /** + * An internal [SimResourceConsumer] implementation for switch inputs. + */ + private val consumer = object : SimResourceConsumer { + /** + * The resource context of the consumer. + */ + private lateinit var ctx: SimResourceContext + + val remainingWork: Double + get() = ctx.remainingWork + + override fun onStart(ctx: SimResourceContext) { + this.ctx = ctx + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return doNext(ctx.capacity) + } + + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + super.onFinish(ctx, cause) + + val iterator = _outputs.iterator() + while (iterator.hasNext()) { + val output = iterator.next() + + // Remove the output from the outputs to prevent ConcurrentModificationException when removing it + // during the call to output.close() + iterator.remove() + + output.close() + } + } + } + + /** + * The total amount of remaining work. + */ + private val totalRemainingWork: Double + get() = consumer.remainingWork + + override fun addOutput(capacity: Double): SimResourceProvider { + check(!isClosed) { "Distributor has been closed" } + + val provider = OutputProvider(capacity) + _outputs.add(provider) + return provider + } + + override fun close() { + if (!isClosed) { + isClosed = true + input.cancel() + } + } + + init { + input.startConsumer(consumer) + } + + /** + * Indicate that the workloads should be re-scheduled. + */ + private fun schedule() { + input.interrupt() + } + + /** + * Schedule the work over the physical CPUs. + */ + private fun doSchedule(capacity: Double): SimResourceCommand { + // If there is no work yet, mark all inputs as idle. + if (outputContexts.isEmpty()) { + return SimResourceCommand.Idle() + } + + val maxUsage = capacity + var duration: Double = Double.MAX_VALUE + var deadline: Long = Long.MAX_VALUE + var availableSpeed = maxUsage + var totalRequestedSpeed = 0.0 + var totalRequestedWork = 0.0 + + // Flush the work of the outputs + var outputIterator = outputContexts.listIterator() + while (outputIterator.hasNext()) { + val output = outputIterator.next() + + output.flush(isIntermediate = true) + + if (output.activeCommand == SimResourceCommand.Exit) { + // Apparently the output consumer has exited, so remove it from the scheduling queue. + outputIterator.remove() + } + } + + // Sort the outputs based on their requested usage + // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + outputContexts.sort() + + // Divide the available input capacity fairly across the outputs using max-min fair sharing + outputIterator = outputContexts.listIterator() + var remaining = outputContexts.size + while (outputIterator.hasNext()) { + val output = outputIterator.next() + val availableShare = availableSpeed / remaining-- + + when (val command = output.activeCommand) { + is SimResourceCommand.Idle -> { + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + output.actualSpeed = 0.0 + } + is SimResourceCommand.Consume -> { + val grantedSpeed = min(output.allowedSpeed, availableShare) + + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + // Ignore idle computation + if (grantedSpeed <= 0.0 || command.work <= 0.0) { + output.actualSpeed = 0.0 + continue + } + + totalRequestedSpeed += command.limit + totalRequestedWork += command.work + + output.actualSpeed = grantedSpeed + availableSpeed -= grantedSpeed + + // The duration that we want to run is that of the shortest request from an output + duration = min(duration, command.work / grantedSpeed) + } + SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" } + } + } + + assert(deadline >= clock.millis()) { "Deadline already passed" } + + this.totalRequestedSpeed = totalRequestedSpeed + this.totalRequestedWork = totalRequestedWork + this.totalAllocatedSpeed = maxUsage - availableSpeed + this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration) + + return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) + SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + } + + /** + * Obtain the next command to perform. + */ + private fun doNext(capacity: Double): SimResourceCommand { + val totalRequestedWork = totalRequestedWork.toLong() + val totalRemainingWork = totalRemainingWork.toLong() + val totalAllocatedWork = totalAllocatedWork.toLong() + val totalRequestedSpeed = totalRequestedSpeed + val totalAllocatedSpeed = totalAllocatedSpeed + + // Force all inputs to re-schedule their work. + val command = doSchedule(capacity) + + // Report metrics + listener?.onSliceFinish( + this, + totalRequestedWork, + totalAllocatedWork - totalRemainingWork, + totalOvercommittedWork.toLong(), + totalInterferedWork.toLong(), + totalAllocatedSpeed, + totalRequestedSpeed + ) + + totalInterferedWork = 0.0 + totalOvercommittedWork = 0.0 + + return command + } + + /** + * Event listener for hypervisor events. + */ + public interface Listener { + /** + * This method is invoked when a slice is finished. + */ + public fun onSliceFinish( + switch: SimResourceDistributor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) + } + + /** + * An internal [SimResourceProvider] implementation for switch outputs. + */ + private inner class OutputProvider(val capacity: Double) : SimResourceProvider { + /** + * The [OutputContext] that is currently running. + */ + private var ctx: OutputContext? = null + + override var state: SimResourceState = SimResourceState.Pending + internal set + + override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource cannot be consumed" } + + val ctx = OutputContext(this, consumer) + this.ctx = ctx + this.state = SimResourceState.Active + outputContexts += ctx + + ctx.start() + schedule() + } + + override fun close() { + cancel() + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Stopped + _outputs.remove(this) + } + } + + override fun interrupt() { + ctx?.interrupt() + } + + override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.stop() + } + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } + } + + /** + * A [SimAbstractResourceContext] for the output resources. + */ + private inner class OutputContext( + private val provider: OutputProvider, + consumer: SimResourceConsumer + ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> { + /** + * The current command that is processed by the vCPU. + */ + var activeCommand: SimResourceCommand = SimResourceCommand.Idle() + + /** + * The processing speed that is allowed by the model constraints. + */ + var allowedSpeed: Double = 0.0 + + /** + * The actual processing speed. + */ + var actualSpeed: Double = 0.0 + + private fun reportOvercommit() { + val remainingWork = remainingWork + totalOvercommittedWork += remainingWork + } + + override fun onIdle(deadline: Long) { + reportOvercommit() + + allowedSpeed = 0.0 + activeCommand = SimResourceCommand.Idle(deadline) + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + reportOvercommit() + + allowedSpeed = speed + activeCommand = SimResourceCommand.Consume(work, limit, deadline) + } + + override fun onFinish(cause: Throwable?) { + reportOvercommit() + + activeCommand = SimResourceCommand.Exit + provider.cancel() + + super.onFinish(cause) + } + + override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { + // Apply performance interference model + val performanceScore = 1.0 + + // Compute the remaining amount of work + return if (work > 0.0) { + // Compute the fraction of compute time allocated to the VM + val fraction = actualSpeed / totalAllocatedSpeed + + // Compute the work that was actually granted to the VM. + val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction + val processed = processingAvailable * performanceScore + + val interferedWork = processingAvailable - processed + + totalInterferedWork += interferedWork + + max(0.0, work - processed) + } else { + 0.0 + } + } + + override fun interrupt() { + // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead + // to infinite recursion. + if (isProcessing) { + return + } + + super.interrupt() + + // Force the scheduler to re-schedule + schedule() + } + + override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlow.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlow.kt new file mode 100644 index 00000000..bbf6ad44 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlow.kt @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceFlow] acts as both a resource consumer and resource provider at the same time, simplifying bridging + * between different components. + */ +public interface SimResourceFlow : SimResourceConsumer, SimResourceProvider diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt new file mode 100644 index 00000000..52b13c5c --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.suspendCancellableCoroutine + +/** + * A [SimResourceProvider] provides some resource of type [R]. + */ +public interface SimResourceProvider : AutoCloseable { + /** + * The state of the resource. + */ + public val state: SimResourceState + + /** + * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously. + * + * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended. + */ + public fun startConsumer(consumer: SimResourceConsumer) + + /** + * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op. + */ + public fun interrupt() + + /** + * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op. + */ + public fun cancel() + + /** + * End the lifetime of the resource. + * + * This operation terminates the existing resource consumer. + */ + public override fun close() +} + +/** + * Consume the resource provided by this provider using the specified [consumer] and suspend execution until + * the consumer has finished. + */ +public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) { + return suspendCancellableCoroutine { cont -> + startConsumer(object : SimResourceConsumer by consumer { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + assert(!cont.isCompleted) { "Coroutine already completed" } + + consumer.onFinish(ctx, cause) + + cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit)) + } + + override fun toString(): String = "SimSuspendingResourceConsumer" + }) + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt new file mode 100644 index 00000000..025b0406 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.opendc.utils.TimerScheduler +import java.time.Clock +import kotlin.math.ceil +import kotlin.math.min + +/** + * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity. + * + * @param initialCapacity The initial capacity of the resource. + * @param clock The virtual clock to track simulation time. + * @param scheduler The scheduler to schedule the interrupts. + */ +public class SimResourceSource( + initialCapacity: Double, + private val clock: Clock, + private val scheduler: TimerScheduler<Any> +) : SimResourceProvider { + /** + * The current processing speed of the resource. + */ + public val speed: Double + get() = ctx?.speed ?: 0.0 + + /** + * The capacity of the resource. + */ + public var capacity: Double = initialCapacity + set(value) { + field = value + ctx?.capacity = value + } + + /** + * The [Context] that is currently running. + */ + private var ctx: Context? = null + + override var state: SimResourceState = SimResourceState.Pending + private set + + override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource is in invalid state" } + val ctx = Context(consumer) + + this.ctx = ctx + this.state = SimResourceState.Active + + ctx.start() + } + + override fun close() { + cancel() + state = SimResourceState.Stopped + } + + override fun interrupt() { + ctx?.interrupt() + } + + override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.stop() + } + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } + + /** + * Internal implementation of [SimResourceContext] for this class. + */ + private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) { + override fun onIdle(deadline: Long) { + // Do not resume if deadline is "infinite" + if (deadline != Long.MAX_VALUE) { + scheduler.startSingleTimerTo(this, deadline) { flush() } + } + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + val until = min(deadline, clock.millis() + getDuration(work, speed)) + + scheduler.startSingleTimerTo(this, until, ::flush) + } + + override fun onFinish(cause: Throwable?) { + scheduler.cancel(this) + cancel() + + super.onFinish(cause) + } + + override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" + } + + /** + * Compute the duration that a resource consumption will take with the specified [speed]. + */ + private fun getDuration(work: Double, speed: Double): Long { + return ceil(work / speed * 1000).toLong() + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt new file mode 100644 index 00000000..c72951d0 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * The state of a resource provider. + */ +public enum class SimResourceState { + /** + * The resource provider is pending and the resource is waiting to be consumed. + */ + Pending, + + /** + * The resource provider is active and the resource is currently being consumed. + */ + Active, + + /** + * The resource provider is stopped and the resource cannot be consumed anymore. + */ + Stopped +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt new file mode 100644 index 00000000..53fec16a --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers. + */ +public interface SimResourceSwitch : AutoCloseable { + /** + * The output resource providers to which resource consumers can be attached. + */ + public val outputs: Set<SimResourceProvider> + + /** + * The input resources that will be switched between the output providers. + */ + public val inputs: Set<SimResourceProvider> + + /** + * Add an output to the switch with the specified [capacity]. + */ + public fun addOutput(capacity: Double): SimResourceProvider + + /** + * Add the specified [input] to the switch. + */ + public fun addInput(input: SimResourceProvider) +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt new file mode 100644 index 00000000..45e4c220 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.util.ArrayDeque + +/** + * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that + * a single output is directly connected to an input and that the switch can only support as much outputs as inputs. + */ +public class SimResourceSwitchExclusive : SimResourceSwitch { + /** + * A flag to indicate that the switch is closed. + */ + private var isClosed: Boolean = false + + private val _outputs = mutableSetOf<Provider>() + override val outputs: Set<SimResourceProvider> + get() = _outputs + + private val availableResources = ArrayDeque<SimResourceTransformer>() + + private val _inputs = mutableSetOf<SimResourceProvider>() + override val inputs: Set<SimResourceProvider> + get() = _inputs + + override fun addOutput(capacity: Double): SimResourceProvider { + check(!isClosed) { "Switch has been closed" } + check(availableResources.isNotEmpty()) { "No capacity to serve request" } + val forwarder = availableResources.poll() + val output = Provider(capacity, forwarder) + _outputs += output + return output + } + + override fun addInput(input: SimResourceProvider) { + check(!isClosed) { "Switch has been closed" } + + if (input in inputs) { + return + } + + val forwarder = SimResourceForwarder() + + _inputs += input + availableResources += forwarder + + input.startConsumer(object : SimResourceConsumer by forwarder { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + // De-register the input after it has finished + _inputs -= input + forwarder.onFinish(ctx, cause) + } + }) + } + + override fun close() { + isClosed = true + + // Cancel all upstream subscriptions + _inputs.forEach(SimResourceProvider::cancel) + } + + private inner class Provider( + private val capacity: Double, + private val forwarder: SimResourceTransformer + ) : SimResourceProvider by forwarder { + override fun close() { + // We explicitly do not close the forwarder here in order to re-use it across output resources. + + _outputs -= this + availableResources += forwarder + } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt new file mode 100644 index 00000000..c796c251 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.* +import java.time.Clock + +/** + * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min + * fair sharing. + */ +public class SimResourceSwitchMaxMin( + clock: Clock, + private val listener: Listener? = null +) : SimResourceSwitch { + private val _outputs = mutableSetOf<SimResourceProvider>() + override val outputs: Set<SimResourceProvider> + get() = _outputs + + private val _inputs = mutableSetOf<SimResourceProvider>() + override val inputs: Set<SimResourceProvider> + get() = _inputs + + /** + * A flag to indicate that the switch was closed. + */ + private var isClosed = false + + /** + * The aggregator to aggregate the resources. + */ + private val aggregator = SimResourceAggregatorMaxMin(clock) + + /** + * The distributor to distribute the aggregated resources. + */ + private val distributor = SimResourceDistributorMaxMin( + aggregator.output, clock, + object : SimResourceDistributorMaxMin.Listener { + override fun onSliceFinish( + switch: SimResourceDistributor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) + } + } + ) + + /** + * Add an output to the switch represented by [resource]. + */ + override fun addOutput(capacity: Double): SimResourceProvider { + check(!isClosed) { "Switch has been closed" } + + val provider = distributor.addOutput(capacity) + _outputs.add(provider) + return provider + } + + /** + * Add the specified [input] to the switch. + */ + override fun addInput(input: SimResourceProvider) { + check(!isClosed) { "Switch has been closed" } + + aggregator.addInput(input) + } + + override fun close() { + if (!isClosed) { + isClosed = true + distributor.close() + aggregator.close() + } + } + + /** + * Event listener for hypervisor events. + */ + public interface Listener { + /** + * This method is invoked when a slice is finished. + */ + public fun onSliceFinish( + switch: SimResourceSwitchMaxMin, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt new file mode 100644 index 00000000..de455021 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider. + * + * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. + * @param transform The function to transform the received resource command. + */ +public class SimResourceTransformer( + private val isCoupled: Boolean = false, + private val transform: (SimResourceContext, SimResourceCommand) -> SimResourceCommand +) : SimResourceFlow { + /** + * The [SimResourceContext] in which the forwarder runs. + */ + private var ctx: SimResourceContext? = null + + /** + * The delegate [SimResourceConsumer]. + */ + private var delegate: SimResourceConsumer? = null + + /** + * A flag to indicate that the delegate was started. + */ + private var hasDelegateStarted: Boolean = false + + /** + * The state of the forwarder. + */ + override var state: SimResourceState = SimResourceState.Pending + private set + + override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource is in invalid state" } + + state = SimResourceState.Active + delegate = consumer + + // Interrupt the provider to replace the consumer + interrupt() + } + + override fun interrupt() { + ctx?.interrupt() + } + + override fun cancel() { + val delegate = delegate + val ctx = ctx + + state = SimResourceState.Pending + + if (delegate != null && ctx != null) { + this.delegate = null + delegate.onFinish(ctx) + } + } + + override fun close() { + val ctx = ctx + + state = SimResourceState.Stopped + + if (ctx != null) { + this.ctx = null + ctx.interrupt() + } + } + + override fun onStart(ctx: SimResourceContext) { + this.ctx = ctx + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + val delegate = delegate + + if (!hasDelegateStarted) { + start() + } + + return if (state == SimResourceState.Stopped) { + SimResourceCommand.Exit + } else if (delegate != null) { + val command = transform(ctx, delegate.onNext(ctx)) + if (command == SimResourceCommand.Exit) { + // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we + // reset beforehand the existing state and check whether it has been updated afterwards + reset() + + delegate.onFinish(ctx) + + if (isCoupled || state == SimResourceState.Stopped) + SimResourceCommand.Exit + else + onNext(ctx) + } else { + command + } + } else { + SimResourceCommand.Idle() + } + } + + override fun onConfirm(ctx: SimResourceContext, speed: Double) { + delegate?.onConfirm(ctx, speed) + } + + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + delegate?.onCapacityChanged(ctx, isThrottled) + } + + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + this.ctx = null + + val delegate = delegate + if (delegate != null) { + reset() + delegate.onFinish(ctx, cause) + } + } + + /** + * Start the delegate. + */ + private fun start() { + val delegate = delegate ?: return + delegate.onStart(checkNotNull(ctx)) + + hasDelegateStarted = true + } + + /** + * Reset the delegate. + */ + private fun reset() { + delegate = null + hasDelegateStarted = false + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } +} + +/** + * Constructs a [SimResourceTransformer] that forwards the received resource command with an identity transform. + * + * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. + */ +public fun SimResourceForwarder(isCoupled: Boolean = false): SimResourceTransformer { + return SimResourceTransformer(isCoupled) { _, command -> command } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt new file mode 100644 index 00000000..52a42241 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.consumer + +/** + * The [SimConsumerBarrier] is a barrier that allows consumers to wait for a select number of other consumers to + * complete, before proceeding its operation. + */ +public class SimConsumerBarrier(public val parties: Int) { + private var counter = 0 + + /** + * Enter the barrier and determine whether the caller is the last to reach the barrier. + * + * @return `true` if the caller is the last to reach the barrier, `false` otherwise. + */ + public fun enter(): Boolean { + val last = ++counter == parties + if (last) { + counter = 0 + return true + } + return false + } + + /** + * Reset the barrier. + */ + public fun reset() { + counter = 0 + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt new file mode 100644 index 00000000..114c7312 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.consumer + +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext +import kotlin.math.min + +/** + * Helper class to expose an observable [speed] field describing the speed of the consumer. + */ +public class SimSpeedConsumerAdapter( + private val delegate: SimResourceConsumer, + private val callback: (Double) -> Unit = {} +) : SimResourceConsumer by delegate { + /** + * The resource processing speed at this instant. + */ + public var speed: Double = 0.0 + private set(value) { + if (field != value) { + callback(value) + field = value + } + } + + init { + callback(0.0) + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return delegate.onNext(ctx) + } + + override fun onConfirm(ctx: SimResourceContext, speed: Double) { + delegate.onConfirm(ctx, speed) + + this.speed = speed + } + + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + val oldSpeed = speed + + delegate.onCapacityChanged(ctx, isThrottled) + + // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might + // need to update the current speed. + if (oldSpeed == speed) { + speed = min(ctx.capacity, speed) + } + } + + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + super.onFinish(ctx, cause) + + speed = 0.0 + } + + override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]" +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt new file mode 100644 index 00000000..a52d1d5d --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.consumer + +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext + +/** + * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource + * consumption for some period of time. + */ +public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer { + private var iterator: Iterator<Fragment>? = null + + override fun onStart(ctx: SimResourceContext) { + check(iterator == null) { "Consumer already running" } + iterator = trace.iterator() + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + val iterator = checkNotNull(iterator) + return if (iterator.hasNext()) { + val now = ctx.clock.millis() + val fragment = iterator.next() + val work = (fragment.duration / 1000) * fragment.usage + val deadline = now + fragment.duration + + assert(deadline >= now) { "Deadline already passed" } + + if (work > 0.0) + SimResourceCommand.Consume(work, fragment.usage, deadline) + else + SimResourceCommand.Idle(deadline) + } else { + SimResourceCommand.Exit + } + } + + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + iterator = null + } + + /** + * A fragment of the workload. + */ + public data class Fragment(val duration: Long, val usage: Double) +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt new file mode 100644 index 00000000..faa693c4 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.consumer + +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext + +/** + * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization. + */ +public class SimWorkConsumer( + private val work: Double, + private val utilization: Double +) : SimResourceConsumer { + + init { + require(work >= 0.0) { "Work must be positive" } + require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } + } + + private var isFirst = true + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + val limit = ctx.capacity * utilization + val work = if (isFirst) { + isFirst = false + work + } else { + ctx.remainingWork + } + return if (work > 0.0) { + SimResourceCommand.Consume(work, limit) + } else { + SimResourceCommand.Exit + } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt new file mode 100644 index 00000000..e272abb8 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -0,0 +1,202 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter +import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.utils.TimerScheduler + +/** + * Test suite for the [SimResourceAggregatorMaxMin] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimResourceAggregatorMaxMinTest { + @Test + fun testSingleCapacity() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val forwarder = SimResourceForwarder() + val sources = listOf( + forwarder, + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(1.0, 0.5) + val usage = mutableListOf<Double>() + val source = SimResourceSource(1.0, clock, scheduler) + val adapter = SimSpeedConsumerAdapter(forwarder, usage::add) + source.startConsumer(adapter) + + try { + aggregator.output.consume(consumer) + yield() + + assertAll( + { assertEquals(1000, clock.millis()) }, + { assertEquals(listOf(0.0, 0.5, 0.0), usage) } + ) + } finally { + aggregator.output.close() + } + } + + @Test + fun testDoubleCapacity() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(2.0, 1.0) + val usage = mutableListOf<Double>() + val adapter = SimSpeedConsumerAdapter(consumer, usage::add) + + try { + aggregator.output.consume(adapter) + yield() + assertAll( + { assertEquals(1000, clock.millis()) }, + { assertEquals(listOf(0.0, 2.0, 0.0), usage) } + ) + } finally { + aggregator.output.close() + } + } + + @Test + fun testOvercommit() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) + .andThen(SimResourceCommand.Exit) + + try { + aggregator.output.consume(consumer) + yield() + assertEquals(1000, clock.millis()) + + verify(exactly = 2) { consumer.onNext(any()) } + } finally { + aggregator.output.close() + } + } + + @Test + fun testException() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) + + try { + assertThrows<IllegalStateException> { aggregator.output.consume(consumer) } + yield() + assertEquals(SimResourceState.Pending, sources[0].state) + } finally { + aggregator.output.close() + } + } + + @Test + fun testAdjustCapacity() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(4.0, 1.0) + try { + coroutineScope { + launch { aggregator.output.consume(consumer) } + delay(1000) + sources[0].capacity = 0.5 + } + yield() + assertEquals(2334, clock.millis()) + } finally { + aggregator.output.close() + } + } + + @Test + fun testFailOverCapacity() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(1.0, 0.5) + try { + coroutineScope { + launch { aggregator.output.consume(consumer) } + delay(500) + sources[0].capacity = 0.5 + } + yield() + assertEquals(1000, clock.millis()) + } finally { + aggregator.output.close() + } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt new file mode 100644 index 00000000..02d456ff --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows + +/** + * Test suite for [SimResourceCommand]. + */ +class SimResourceCommandTest { + @Test + fun testZeroWork() { + assertThrows<IllegalArgumentException> { + SimResourceCommand.Consume(0.0, 1.0) + } + } + + @Test + fun testNegativeWork() { + assertThrows<IllegalArgumentException> { + SimResourceCommand.Consume(-1.0, 1.0) + } + } + + @Test + fun testZeroLimit() { + assertThrows<IllegalArgumentException> { + SimResourceCommand.Consume(1.0, 0.0) + } + } + + @Test + fun testNegativeLimit() { + assertThrows<IllegalArgumentException> { + SimResourceCommand.Consume(1.0, -1.0, 1) + } + } + + @Test + fun testConsumeCorrect() { + assertDoesNotThrow { + SimResourceCommand.Consume(1.0, 1.0) + } + } + + @Test + fun testIdleCorrect() { + assertDoesNotThrow { + SimResourceCommand.Idle(1) + } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt new file mode 100644 index 00000000..be909556 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import io.mockk.* +import kotlinx.coroutines.* +import org.junit.jupiter.api.* +import org.opendc.simulator.core.runBlockingSimulation + +/** + * A test suite for the [SimAbstractResourceContext] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +class SimResourceContextTest { + @Test + fun testFlushWithoutCommand() = runBlockingSimulation { + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit + + val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { + override fun onIdle(deadline: Long) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + override fun onFinish(cause: Throwable?) {} + } + + context.flush() + } + + @Test + fun testIntermediateFlush() = runBlockingSimulation { + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit + + val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { + override fun onIdle(deadline: Long) {} + override fun onFinish(cause: Throwable?) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + }) + + context.start() + delay(1) // Delay 1 ms to prevent hitting the fast path + context.flush(isIntermediate = true) + + verify(exactly = 2) { context.onConsume(any(), any(), any()) } + } + + @Test + fun testIntermediateFlushIdle() = runBlockingSimulation { + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit + + val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { + override fun onIdle(deadline: Long) {} + override fun onFinish(cause: Throwable?) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + }) + + context.start() + delay(5) + context.flush(isIntermediate = true) + delay(5) + context.flush(isIntermediate = true) + + assertAll( + { verify(exactly = 2) { context.onIdle(any()) } }, + { verify(exactly = 1) { context.onFinish(null) } } + ) + } + + @Test + fun testDoubleStart() = runBlockingSimulation { + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit + + val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { + override fun onIdle(deadline: Long) {} + override fun onFinish(cause: Throwable?) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + } + + context.start() + assertThrows<IllegalStateException> { context.start() } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt new file mode 100644 index 00000000..39f74481 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -0,0 +1,351 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify +import kotlinx.coroutines.* +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter +import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.utils.TimerScheduler + +/** + * A test suite for the [SimResourceSource] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +class SimResourceSourceTest { + @Test + fun testSpeed() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1000 * capacity, capacity)) + .andThen(SimResourceCommand.Exit) + + try { + val res = mutableListOf<Double>() + val adapter = SimSpeedConsumerAdapter(consumer, res::add) + + provider.consume(adapter) + + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } + } finally { + scheduler.close() + provider.close() + } + } + + @Test + fun testAdjustCapacity() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(1.0, clock, scheduler) + + val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + + try { + coroutineScope { + launch { provider.consume(consumer) } + delay(1000) + provider.capacity = 0.5 + } + assertEquals(3000, clock.millis()) + verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } + } finally { + scheduler.close() + provider.close() + } + } + + @Test + fun testSpeedLimit() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1000 * capacity, 2 * capacity)) + .andThen(SimResourceCommand.Exit) + + try { + val res = mutableListOf<Double>() + val adapter = SimSpeedConsumerAdapter(consumer, res::add) + + provider.consume(adapter) + + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } + } finally { + scheduler.close() + provider.close() + } + } + + /** + * Test to see whether no infinite recursion occurs when interrupting during [SimResourceConsumer.onStart] or + * [SimResourceConsumer.onNext]. + */ + @Test + fun testIntermediateInterrupt() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + + val consumer = object : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext) { + ctx.interrupt() + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return SimResourceCommand.Exit + } + } + + try { + provider.consume(consumer) + } finally { + scheduler.close() + provider.close() + } + } + + @Test + fun testInterrupt() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + lateinit var resCtx: SimResourceContext + + val consumer = object : SimResourceConsumer { + var isFirst = true + override fun onStart(ctx: SimResourceContext) { + resCtx = ctx + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + assertEquals(0.0, ctx.remainingWork) + return if (isFirst) { + isFirst = false + SimResourceCommand.Consume(4.0, 1.0) + } else { + SimResourceCommand.Exit + } + } + } + + try { + launch { + yield() + resCtx.interrupt() + } + provider.consume(consumer) + + assertEquals(0, clock.millis()) + } finally { + scheduler.close() + provider.close() + } + } + + @Test + fun testFailure() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onStart(any()) } + .throws(IllegalStateException()) + + try { + assertThrows<IllegalStateException> { + provider.consume(consumer) + } + } finally { + scheduler.close() + provider.close() + } + } + + @Test + fun testExceptionPropagationOnNext() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) + + try { + assertThrows<IllegalStateException> { + provider.consume(consumer) + } + } finally { + scheduler.close() + provider.close() + } + } + + @Test + fun testConcurrentConsumption() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) + + try { + assertThrows<IllegalStateException> { + coroutineScope { + launch { provider.consume(consumer) } + provider.consume(consumer) + } + } + } finally { + scheduler.close() + provider.close() + } + } + + @Test + fun testClosedConsumption() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) + + try { + assertThrows<IllegalStateException> { + provider.close() + provider.consume(consumer) + } + } finally { + scheduler.close() + provider.close() + } + } + + @Test + fun testCloseDuringConsumption() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) + + try { + launch { provider.consume(consumer) } + delay(500) + provider.close() + + assertEquals(500, clock.millis()) + } finally { + scheduler.close() + provider.close() + } + } + + @Test + fun testIdle() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Idle(clock.millis() + 500)) + .andThen(SimResourceCommand.Exit) + + try { + provider.consume(consumer) + + assertEquals(500, clock.millis()) + } finally { + scheduler.close() + provider.close() + } + } + + @Test + fun testInfiniteSleep() { + assertThrows<IllegalStateException> { + runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Idle()) + .andThenThrows(IllegalStateException()) + + try { + provider.consume(consumer) + } finally { + scheduler.close() + provider.close() + } + } + } + } + + @Test + fun testIncorrectDeadline() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Idle(2)) + .andThen(SimResourceCommand.Exit) + + try { + delay(10) + + assertThrows<IllegalArgumentException> { provider.consume(consumer) } + } finally { + scheduler.close() + provider.close() + } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt new file mode 100644 index 00000000..f7d17867 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter +import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.utils.TimerScheduler + +/** + * Test suite for the [SimResourceSwitchExclusive] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimResourceSwitchExclusiveTest { + /** + * Test a trace workload. + */ + @Test + fun testTrace() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val speed = mutableListOf<Double>() + + val duration = 5 * 60L + val workload = + SimTraceConsumer( + sequenceOf( + SimTraceConsumer.Fragment(duration * 1000, 28.0), + SimTraceConsumer.Fragment(duration * 1000, 3500.0), + SimTraceConsumer.Fragment(duration * 1000, 0.0), + SimTraceConsumer.Fragment(duration * 1000, 183.0) + ), + ) + + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) + val forwarder = SimResourceForwarder() + val adapter = SimSpeedConsumerAdapter(forwarder, speed::add) + source.startConsumer(adapter) + switch.addInput(forwarder) + + val provider = switch.addOutput(3200.0) + + try { + provider.consume(workload) + yield() + } finally { + provider.close() + } + + assertAll( + { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, + { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } + ) + } + + /** + * Test runtime workload on hypervisor. + */ + @Test + fun testRuntimeWorkload() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit + + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) + + switch.addInput(source) + + val provider = switch.addOutput(3200.0) + + try { + provider.consume(workload) + yield() + } finally { + provider.close() + } + assertEquals(duration, clock.millis()) { "Took enough time" } + } + + /** + * Test two workloads running sequentially. + */ + @Test + fun testTwoWorkloads() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = object : SimResourceConsumer { + var isFirst = true + + override fun onStart(ctx: SimResourceContext) { + isFirst = true + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return if (isFirst) { + isFirst = false + SimResourceCommand.Consume(duration / 1000.0, 1.0) + } else { + SimResourceCommand.Exit + } + } + } + + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) + + switch.addInput(source) + + val provider = switch.addOutput(3200.0) + + try { + provider.consume(workload) + yield() + provider.consume(workload) + } finally { + provider.close() + } + assertEquals(duration * 2, clock.millis()) { "Took enough time" } + } + + /** + * Test concurrent workloads on the machine. + */ + @Test + fun testConcurrentWorkloadFails() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit + + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) + + switch.addInput(source) + + switch.addOutput(3200.0) + assertThrows<IllegalStateException> { switch.addOutput(3200.0) } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt new file mode 100644 index 00000000..7416f277 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -0,0 +1,193 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.utils.TimerScheduler + +/** + * Test suite for the [SimResourceSwitch] implementations + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimResourceSwitchMaxMinTest { + @Test + fun testSmoke() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val switch = SimResourceSwitchMaxMin(clock) + + val sources = List(2) { SimResourceSource(2000.0, clock, scheduler) } + sources.forEach { switch.addInput(it) } + + val provider = switch.addOutput(1000.0) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit + + try { + provider.consume(consumer) + yield() + } finally { + switch.close() + scheduler.close() + } + } + + /** + * Test overcommitting of resources via the hypervisor with a single VM. + */ + @Test + fun testOvercommittedSingle() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val listener = object : SimResourceSwitchMaxMin.Listener { + var totalRequestedWork = 0L + var totalGrantedWork = 0L + var totalOvercommittedWork = 0L + + override fun onSliceFinish( + switch: SimResourceSwitchMaxMin, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + totalRequestedWork += requestedWork + totalGrantedWork += grantedWork + totalOvercommittedWork += overcommittedWork + } + } + + val duration = 5 * 60L + val workload = + SimTraceConsumer( + sequenceOf( + SimTraceConsumer.Fragment(duration * 1000, 28.0), + SimTraceConsumer.Fragment(duration * 1000, 3500.0), + SimTraceConsumer.Fragment(duration * 1000, 0.0), + SimTraceConsumer.Fragment(duration * 1000, 183.0) + ), + ) + + val switch = SimResourceSwitchMaxMin(clock, listener) + val provider = switch.addOutput(3200.0) + + try { + switch.addInput(SimResourceSource(3200.0, clock, scheduler)) + provider.consume(workload) + yield() + } finally { + switch.close() + scheduler.close() + } + + assertAll( + { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") }, + { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, + { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(1200000, clock.millis()) } + ) + } + + /** + * Test overcommitting of resources via the hypervisor with two VMs. + */ + @Test + fun testOvercommittedDual() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val listener = object : SimResourceSwitchMaxMin.Listener { + var totalRequestedWork = 0L + var totalGrantedWork = 0L + var totalOvercommittedWork = 0L + + override fun onSliceFinish( + switch: SimResourceSwitchMaxMin, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + totalRequestedWork += requestedWork + totalGrantedWork += grantedWork + totalOvercommittedWork += overcommittedWork + } + } + + val duration = 5 * 60L + val workloadA = + SimTraceConsumer( + sequenceOf( + SimTraceConsumer.Fragment(duration * 1000, 28.0), + SimTraceConsumer.Fragment(duration * 1000, 3500.0), + SimTraceConsumer.Fragment(duration * 1000, 0.0), + SimTraceConsumer.Fragment(duration * 1000, 183.0) + ), + ) + val workloadB = + SimTraceConsumer( + sequenceOf( + SimTraceConsumer.Fragment(duration * 1000, 28.0), + SimTraceConsumer.Fragment(duration * 1000, 3100.0), + SimTraceConsumer.Fragment(duration * 1000, 0.0), + SimTraceConsumer.Fragment(duration * 1000, 73.0) + ) + ) + + val switch = SimResourceSwitchMaxMin(clock, listener) + val providerA = switch.addOutput(3200.0) + val providerB = switch.addOutput(3200.0) + + try { + switch.addInput(SimResourceSource(3200.0, clock, scheduler)) + + coroutineScope { + launch { providerA.consume(workloadA) } + providerB.consume(workloadB) + } + + yield() + } finally { + switch.close() + scheduler.close() + } + assertAll( + { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, + { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") }, + { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(1200000, clock.millis()) } + ) + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt new file mode 100644 index 00000000..d2ad73bc --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify +import kotlinx.coroutines.* +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.utils.TimerScheduler + +/** + * A test suite for the [SimResourceTransformer] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimResourceTransformerTest { + @Test + fun testExitImmediately() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(2000.0, clock, scheduler) + + launch { + source.consume(forwarder) + source.close() + } + + forwarder.consume(object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return SimResourceCommand.Exit + } + }) + + forwarder.close() + scheduler.close() + } + + @Test + fun testExit() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(2000.0, clock, scheduler) + + launch { + source.consume(forwarder) + source.close() + } + + forwarder.consume(object : SimResourceConsumer { + var isFirst = true + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return if (isFirst) { + isFirst = false + SimResourceCommand.Consume(10.0, 1.0) + } else { + SimResourceCommand.Exit + } + } + }) + + forwarder.close() + } + + @Test + fun testState() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit + } + + assertEquals(SimResourceState.Pending, forwarder.state) + + forwarder.startConsumer(consumer) + assertEquals(SimResourceState.Active, forwarder.state) + + assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) } + + forwarder.cancel() + assertEquals(SimResourceState.Pending, forwarder.state) + + forwarder.close() + assertEquals(SimResourceState.Stopped, forwarder.state) + } + + @Test + fun testCancelPendingDelegate() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Exit + + forwarder.startConsumer(consumer) + forwarder.cancel() + + verify(exactly = 0) { consumer.onFinish(any(), null) } + } + + @Test + fun testCancelStartedDelegate() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(2000.0, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) + + source.startConsumer(forwarder) + yield() + forwarder.startConsumer(consumer) + yield() + forwarder.cancel() + + verify(exactly = 1) { consumer.onStart(any()) } + verify(exactly = 1) { consumer.onFinish(any(), null) } + } + + @Test + fun testCancelPropagation() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(2000.0, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) + + source.startConsumer(forwarder) + yield() + forwarder.startConsumer(consumer) + yield() + source.cancel() + + verify(exactly = 1) { consumer.onStart(any()) } + verify(exactly = 1) { consumer.onFinish(any(), null) } + } + + @Test + fun testExitPropagation() = runBlockingSimulation { + val forwarder = SimResourceForwarder(isCoupled = true) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(2000.0, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Exit + + source.startConsumer(forwarder) + forwarder.consume(consumer) + yield() + + assertEquals(SimResourceState.Pending, source.state) + } + + @Test + fun testAdjustCapacity() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(1.0, clock, scheduler) + + val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + source.startConsumer(forwarder) + + coroutineScope { + launch { forwarder.consume(consumer) } + delay(1000) + source.capacity = 0.5 + } + + assertEquals(3000, clock.millis()) + verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } + } + + @Test + fun testTransformExit() = runBlockingSimulation { + val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit } + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(1.0, clock, scheduler) + + val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + source.startConsumer(forwarder) + forwarder.consume(consumer) + + assertEquals(0, clock.millis()) + verify(exactly = 1) { consumer.onNext(any()) } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt new file mode 100644 index 00000000..bf58b1b6 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.utils.TimerScheduler + +/** + * A test suite for the [SimWorkConsumer] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimWorkConsumerTest { + @Test + fun testSmoke() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(1.0, clock, scheduler) + + val consumer = SimWorkConsumer(1.0, 1.0) + + try { + provider.consume(consumer) + assertEquals(1000, clock.millis()) + } finally { + provider.close() + } + } + + @Test + fun testUtilization() = runBlockingSimulation { + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(1.0, clock, scheduler) + + val consumer = SimWorkConsumer(1.0, 0.5) + + try { + provider.consume(consumer) + assertEquals(2000, clock.millis()) + } finally { + provider.close() + } + } +} |
