diff options
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-resources/src')
31 files changed, 0 insertions, 3835 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt deleted file mode 100644 index 8d2587b1..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.opendc.simulator.resources.consumer.SimTraceConsumer - -/** - * Helper function to create simple consumer workload. - */ -fun createSimpleConsumer(): SimResourceConsumer { - return SimTraceConsumer( - sequenceOf( - SimTraceConsumer.Fragment(1000, 28.0), - SimTraceConsumer.Fragment(1000, 3500.0), - SimTraceConsumer.Fragment(1000, 0.0), - SimTraceConsumer.Fragment(1000, 183.0), - SimTraceConsumer.Fragment(1000, 400.0), - SimTraceConsumer.Fragment(1000, 100.0), - SimTraceConsumer.Fragment(1000, 3000.0), - SimTraceConsumer.Fragment(1000, 4500.0), - ), - ) -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt deleted file mode 100644 index beda3eaa..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.launch -import org.opendc.simulator.core.SimulationCoroutineScope -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.utils.TimerScheduler -import org.openjdk.jmh.annotations.* -import java.util.concurrent.TimeUnit - -@State(Scope.Thread) -@Fork(1) -@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) -@OptIn(ExperimentalCoroutinesApi::class) -class SimResourceBenchmarks { - private lateinit var scope: SimulationCoroutineScope - private lateinit var scheduler: TimerScheduler<Any> - - @Setup - fun setUp() { - scope = SimulationCoroutineScope() - scheduler = TimerScheduler(scope.coroutineContext, scope.clock) - } - - @State(Scope.Thread) - class Workload { - lateinit var consumers: Array<SimResourceConsumer> - - @Setup - fun setUp() { - consumers = Array(3) { createSimpleConsumer() } - } - } - - @Benchmark - fun benchmarkSource(state: Workload) { - return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, clock, scheduler) - return@runBlockingSimulation provider.consume(state.consumers[0]) - } - } - - @Benchmark - fun benchmarkForwardOverhead(state: Workload) { - return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, clock, scheduler) - val forwarder = SimResourceForwarder() - provider.startConsumer(forwarder) - return@runBlockingSimulation forwarder.consume(state.consumers[0]) - } - } - - @Benchmark - fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) { - return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(clock) - - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - - val provider = switch.addOutput(3500.0) - return@runBlockingSimulation provider.consume(state.consumers[0]) - } - } - - @Benchmark - fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) { - return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(clock) - - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - - repeat(3) { i -> - launch { - val provider = switch.addOutput(3500.0) - provider.consume(state.consumers[i]) - } - } - } - } - - @Benchmark - fun benchmarkSwitchExclusiveSingleConsumer(state: Workload) { - return scope.runBlockingSimulation { - val switch = SimResourceSwitchExclusive() - - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - - val provider = switch.addOutput(3500.0) - return@runBlockingSimulation provider.consume(state.consumers[0]) - } - } - - @Benchmark - fun benchmarkSwitchExclusiveTripleConsumer(state: Workload) { - return scope.runBlockingSimulation { - val switch = SimResourceSwitchExclusive() - - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - - repeat(2) { i -> - launch { - val provider = switch.addOutput(3500.0) - provider.consume(state.consumers[i]) - } - } - } - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt deleted file mode 100644 index c7fa6a17..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import java.time.Clock - -/** - * Abstract implementation of [SimResourceAggregator]. - */ -public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator { - /** - * The available resource provider contexts. - */ - protected val inputContexts: Set<SimResourceContext> - get() = _inputContexts - private val _inputContexts = mutableSetOf<SimResourceContext>() - - /** - * The output context. - */ - protected val outputContext: SimResourceContext - get() = context - - /** - * The commands to submit to the underlying input resources. - */ - protected val commands: MutableMap<SimResourceContext, SimResourceCommand> = mutableMapOf() - - /** - * This method is invoked when the resource consumer consumes resources. - */ - protected abstract fun doConsume(work: Double, limit: Double, deadline: Long) - - /** - * This method is invoked when the resource consumer enters an idle state. - */ - protected open fun doIdle(deadline: Long) { - for (input in inputContexts) { - commands[input] = SimResourceCommand.Idle(deadline) - } - } - - /** - * This method is invoked when the resource consumer finishes processing. - */ - protected open fun doFinish(cause: Throwable?) { - for (input in inputContexts) { - commands[input] = SimResourceCommand.Exit - } - } - - /** - * This method is invoked when an input context is started. - */ - protected open fun onContextStarted(ctx: SimResourceContext) { - _inputContexts.add(ctx) - } - - protected open fun onContextFinished(ctx: SimResourceContext) { - assert(_inputContexts.remove(ctx)) { "Lost context" } - } - - override fun addInput(input: SimResourceProvider) { - check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" } - - val consumer = Consumer() - _inputs.add(input) - input.startConsumer(consumer) - } - - override fun close() { - output.close() - } - - override val output: SimResourceProvider - get() = _output - private val _output = SimResourceForwarder() - - override val inputs: Set<SimResourceProvider> - get() = _inputs - private val _inputs = mutableSetOf<SimResourceProvider>() - - private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) { - override val remainingWork: Double - get() { - val now = clock.millis() - - return if (_remainingWorkFlush < now) { - _remainingWorkFlush = now - _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it } - } else { - _remainingWork - } - } - private var _remainingWork: Double = 0.0 - private var _remainingWorkFlush: Long = Long.MIN_VALUE - - override fun interrupt() { - super.interrupt() - - interruptAll() - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline) - - override fun onIdle(deadline: Long) = doIdle(deadline) - - override fun onFinish(cause: Throwable?) { - doFinish(cause) - - super.onFinish(cause) - - interruptAll() - } - } - - /** - * A flag to indicate that an interrupt is active. - */ - private var isInterrupting: Boolean = false - - /** - * Schedule the work over the input resources. - */ - private fun doSchedule() { - context.flush(isIntermediate = true) - interruptAll() - } - - /** - * Interrupt all inputs. - */ - private fun interruptAll() { - // Prevent users from interrupting the resource while they are constructing their next command, as this will - // only lead to infinite recursion. - if (isInterrupting) { - return - } - - try { - isInterrupting = true - - val iterator = _inputs.iterator() - while (iterator.hasNext()) { - val input = iterator.next() - input.interrupt() - - if (input.state != SimResourceState.Active) { - iterator.remove() - } - } - } finally { - isInterrupting = false - } - } - - /** - * An internal [SimResourceConsumer] implementation for aggregator inputs. - */ - private inner class Consumer : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext) { - onContextStarted(ctx) - onCapacityChanged(ctx, false) - - // Make sure we initialize the output if we have not done so yet - if (context.state == SimResourceState.Pending) { - context.start() - } - } - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - doSchedule() - - return commands[ctx] ?: SimResourceCommand.Idle() - } - - override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { - // Adjust capacity of output resource - context.capacity = inputContexts.sumByDouble { it.capacity } - } - - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - onContextFinished(ctx) - - super.onFinish(ctx, cause) - } - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt deleted file mode 100644 index 05ed0714..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import java.time.Clock -import kotlin.math.max -import kotlin.math.min - -/** - * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. - */ -public abstract class SimAbstractResourceContext( - initialCapacity: Double, - override val clock: Clock, - private val consumer: SimResourceConsumer -) : SimResourceContext { - /** - * The capacity of the resource. - */ - public final override var capacity: Double = initialCapacity - set(value) { - val oldValue = field - - // Only changes will be propagated - if (value != oldValue) { - field = value - onCapacityChange() - } - } - - /** - * The amount of work still remaining at this instant. - */ - override val remainingWork: Double - get() { - val activeCommand = activeCommand ?: return 0.0 - val now = clock.millis() - - return if (_remainingWorkFlush < now) { - _remainingWorkFlush = now - computeRemainingWork(activeCommand, now).also { _remainingWork = it } - } else { - _remainingWork - } - } - private var _remainingWork: Double = 0.0 - private var _remainingWorkFlush: Long = Long.MIN_VALUE - - /** - * A flag to indicate the state of the context. - */ - public var state: SimResourceState = SimResourceState.Pending - private set - - /** - * The current processing speed of the resource. - */ - public var speed: Double = 0.0 - private set - - /** - * This method is invoked when the resource will idle until the specified [deadline]. - */ - public abstract fun onIdle(deadline: Long) - - /** - * This method is invoked when the resource will be consumed until the specified [work] was processed or the - * [deadline] was reached. - */ - public abstract fun onConsume(work: Double, limit: Double, deadline: Long) - - /** - * This method is invoked when the resource consumer has finished. - */ - public open fun onFinish(cause: Throwable?) { - consumer.onFinish(this, cause) - } - - /** - * Get the remaining work to process after a resource consumption. - * - * @param work The size of the resource consumption. - * @param speed The speed of consumption. - * @param duration The duration from the start of the consumption until now. - * @return The amount of work remaining. - */ - protected open fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { - return if (duration > 0L) { - val processed = duration / 1000.0 * speed - max(0.0, work - processed) - } else { - 0.0 - } - } - - /** - * Start the consumer. - */ - public fun start() { - check(state == SimResourceState.Pending) { "Consumer is already started" } - - val now = clock.millis() - - state = SimResourceState.Active - isProcessing = true - latestFlush = now - - try { - consumer.onStart(this) - activeCommand = interpret(consumer.onNext(this), now) - } catch (cause: Throwable) { - doStop(cause) - } finally { - isProcessing = false - } - } - - /** - * Immediately stop the consumer. - */ - public fun stop() { - try { - isProcessing = true - latestFlush = clock.millis() - - flush(isIntermediate = true) - doStop(null) - } catch (cause: Throwable) { - doStop(cause) - } finally { - isProcessing = false - } - } - - /** - * Flush the current active resource consumption. - * - * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be - * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer - * will be asked to deliver a new command and is essentially interrupted. - */ - public fun flush(isIntermediate: Boolean = false) { - // Flush is no-op when the consumer is finished or not yet started - if (state != SimResourceState.Active) { - return - } - - val now = clock.millis() - - // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it. - if (isIntermediate && latestFlush >= now) { - return - } - - try { - val activeCommand = activeCommand ?: return - val (timestamp, command) = activeCommand - - // Note: accessor is reliant on activeCommand being set - val remainingWork = remainingWork - - isProcessing = true - - val duration = now - timestamp - assert(duration >= 0) { "Flush in the past" } - - this.activeCommand = when (command) { - is SimResourceCommand.Idle -> { - // We should only continue processing the next command if: - // 1. The resource consumer reached its deadline. - // 2. The resource consumer should be interrupted (e.g., someone called .interrupt()) - if (command.deadline <= now || !isIntermediate) { - next(now) - } else { - interpret(SimResourceCommand.Idle(command.deadline), now) - } - } - is SimResourceCommand.Consume -> { - // We should only continue processing the next command if: - // 1. The resource consumption was finished. - // 2. The resource capacity cannot satisfy the demand. - // 4. The resource consumer should be interrupted (e.g., someone called .interrupt()) - if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { - next(now) - } else { - interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline), now) - } - } - SimResourceCommand.Exit -> - // Flush may not be called when the resource consumer has finished - throw IllegalStateException() - } - - // Flush remaining work cache - _remainingWorkFlush = Long.MIN_VALUE - } catch (cause: Throwable) { - doStop(cause) - } finally { - latestFlush = now - isProcessing = false - } - } - - override fun interrupt() { - // Prevent users from interrupting the resource while they are constructing their next command, as this will - // only lead to infinite recursion. - if (isProcessing) { - return - } - - flush() - } - - override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]" - - /** - * A flag to indicate that the resource is currently processing a command. - */ - protected var isProcessing: Boolean = false - - /** - * The current command that is being processed. - */ - private var activeCommand: CommandWrapper? = null - - /** - * The latest timestamp at which the resource was flushed. - */ - private var latestFlush: Long = Long.MIN_VALUE - - /** - * Finish the consumer and resource provider. - */ - private fun doStop(cause: Throwable?) { - val state = state - this.state = SimResourceState.Stopped - - if (state == SimResourceState.Active) { - activeCommand = null - onFinish(cause) - } - } - - /** - * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. - */ - private fun interpret(command: SimResourceCommand, now: Long): CommandWrapper? { - when (command) { - is SimResourceCommand.Idle -> { - val deadline = command.deadline - - require(deadline >= now) { "Deadline already passed" } - - speed = 0.0 - consumer.onConfirm(this, 0.0) - - onIdle(deadline) - } - is SimResourceCommand.Consume -> { - val work = command.work - val limit = command.limit - val deadline = command.deadline - - require(deadline >= now) { "Deadline already passed" } - - speed = min(capacity, limit) - consumer.onConfirm(this, speed) - - onConsume(work, limit, deadline) - } - is SimResourceCommand.Exit -> { - speed = 0.0 - - doStop(null) - - // No need to set the next active command - return null - } - } - - return CommandWrapper(now, command) - } - - /** - * Request the workload for more work. - */ - private fun next(now: Long): CommandWrapper? = interpret(consumer.onNext(this), now) - - /** - * Compute the remaining work based on the specified [wrapper] and [timestamp][now]. - */ - private fun computeRemainingWork(wrapper: CommandWrapper, now: Long): Double { - val (timestamp, command) = wrapper - val duration = now - timestamp - return when (command) { - is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration) - is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0 - } - } - - /** - * Indicate that the capacity of the resource has changed. - */ - private fun onCapacityChange() { - // Do not inform the consumer if it has not been started yet - if (state != SimResourceState.Active) { - return - } - - val isThrottled = speed > capacity - consumer.onCapacityChanged(this, isThrottled) - - // Optimization: only flush changes if the new capacity cannot satisfy the active resource command. - // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush(). - if (isThrottled) { - flush(isIntermediate = true) - } - } - - /** - * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. - */ - private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand) -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt deleted file mode 100644 index bb4e6a2c..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource. - */ -public interface SimResourceAggregator : AutoCloseable { - /** - * The output resource provider to which resource consumers can be attached. - */ - public val output: SimResourceProvider - - /** - * The input resources that will be switched between the output providers. - */ - public val inputs: Set<SimResourceProvider> - - /** - * Add the specified [input] to the switch. - */ - public fun addInput(input: SimResourceProvider) - - /** - * End the lifecycle of the aggregator. - */ - public override fun close() -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt deleted file mode 100644 index 08bc064e..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import java.time.Clock - -/** - * A [SimResourceAggregator] that distributes the load equally across the input resources. - */ -public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) { - private val consumers = mutableListOf<SimResourceContext>() - - override fun doConsume(work: Double, limit: Double, deadline: Long) { - // Sort all consumers by their capacity - consumers.sortWith(compareBy { it.capacity }) - - // Divide the requests over the available capacity of the input resources fairly - for (input in consumers) { - val inputCapacity = input.capacity - val fraction = inputCapacity / outputContext.capacity - val grantedSpeed = limit * fraction - val grantedWork = fraction * work - - commands[input] = - if (grantedWork > 0.0 && grantedSpeed > 0.0) - SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) - else - SimResourceCommand.Idle(deadline) - } - } - - override fun onContextStarted(ctx: SimResourceContext) { - super.onContextStarted(ctx) - - consumers.add(ctx) - } - - override fun onContextFinished(ctx: SimResourceContext) { - super.onContextFinished(ctx) - - consumers.remove(ctx) - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt deleted file mode 100644 index f7f3fa4d..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer]. - */ -public sealed class SimResourceCommand { - /** - * A request to the resource to perform the specified amount of work before the given [deadline]. - * - * @param work The amount of work to process. - * @param limit The maximum amount of work to be processed per second. - * @param deadline The instant at which the work needs to be fulfilled. - */ - public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() { - init { - require(work > 0) { "Amount of work must be positive" } - require(limit > 0) { "Limit must be positive" } - } - } - - /** - * An indication to the resource that the consumer will idle until the specified [deadline] or if it is interrupted. - */ - public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() - - /** - * An indication to the resource that the consumer has finished. - */ - public object Exit : SimResourceCommand() -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt deleted file mode 100644 index 38672b13..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * A [SimResourceConsumer] characterizes how a resource is consumed. - * - * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently) - * for multiple resource providers, unless explicitly said otherwise. - */ -public interface SimResourceConsumer { - /** - * This method is invoked when the consumer is started for some resource. - * - * @param ctx The execution context in which the consumer runs. - */ - public fun onStart(ctx: SimResourceContext) {} - - /** - * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because - * the resource finished processing, reached its deadline or was interrupted. - * - * @param ctx The execution context in which the consumer runs. - * @return The next command that the resource should execute. - */ - public fun onNext(ctx: SimResourceContext): SimResourceCommand - - /** - * This method is invoked when the resource provider confirms that the consumer is running at the given speed. - * - * @param ctx The execution context in which the consumer runs. - * @param speed The speed at which the consumer runs. - */ - public fun onConfirm(ctx: SimResourceContext, speed: Double) {} - - /** - * This is method is invoked when the capacity of the resource changes. - * - * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the - * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly - * causing the active resource command to finish at a later moment than initially planned. - * - * @param ctx The execution context in which the consumer runs. - * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the - * capacity change. - */ - public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {} - - /** - * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], - * the resource finished itself, or a failure occurred at the resource. - * - * Note that throwing an exception in [onStart] or [onNext] is undefined behavior and up to the resource provider. - * - * @param ctx The execution context in which the consumer ran. - * @param cause The cause of the finish in case the resource finished exceptionally. - */ - public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {} -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt deleted file mode 100644 index 11dbb09f..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import java.time.Clock - -/** - * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a - * resource and a resource consumer. - */ -public interface SimResourceContext { - /** - * The virtual clock tracking simulation time. - */ - public val clock: Clock - - /** - * The resource capacity available at this instant. - */ - public val capacity: Double - - /** - * The amount of work still remaining at this instant. - */ - public val remainingWork: Double - - /** - * Ask the resource provider to interrupt its resource. - */ - public fun interrupt() -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt deleted file mode 100644 index b2759b7f..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers. - */ -public interface SimResourceDistributor : AutoCloseable { - /** - * The output resource providers to which resource consumers can be attached. - */ - public val outputs: Set<SimResourceProvider> - - /** - * The input resource that will be distributed over the consumers. - */ - public val input: SimResourceProvider - - /** - * Add an output to the switch with the specified [capacity]. - */ - public fun addOutput(capacity: Double): SimResourceProvider -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt deleted file mode 100644 index dfdd2c2e..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ /dev/null @@ -1,420 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import java.time.Clock -import kotlin.math.max -import kotlin.math.min - -/** - * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing. - */ -public class SimResourceDistributorMaxMin( - override val input: SimResourceProvider, - private val clock: Clock, - private val listener: Listener? = null -) : SimResourceDistributor { - override val outputs: Set<SimResourceProvider> - get() = _outputs - private val _outputs = mutableSetOf<OutputProvider>() - - /** - * The active output contexts. - */ - private val outputContexts: MutableList<OutputContext> = mutableListOf() - - /** - * The total speed requested by the output resources. - */ - private var totalRequestedSpeed = 0.0 - - /** - * The total amount of work requested by the output resources. - */ - private var totalRequestedWork = 0.0 - - /** - * The total allocated speed for the output resources. - */ - private var totalAllocatedSpeed = 0.0 - - /** - * The total allocated work requested for the output resources. - */ - private var totalAllocatedWork = 0.0 - - /** - * The amount of work that could not be performed due to over-committing resources. - */ - private var totalOvercommittedWork = 0.0 - - /** - * The amount of work that was lost due to interference. - */ - private var totalInterferedWork = 0.0 - - /** - * A flag to indicate that the switch is closed. - */ - private var isClosed: Boolean = false - - /** - * An internal [SimResourceConsumer] implementation for switch inputs. - */ - private val consumer = object : SimResourceConsumer { - /** - * The resource context of the consumer. - */ - private lateinit var ctx: SimResourceContext - - val remainingWork: Double - get() = ctx.remainingWork - - override fun onStart(ctx: SimResourceContext) { - this.ctx = ctx - } - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return doNext(ctx.capacity) - } - - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - super.onFinish(ctx, cause) - - val iterator = _outputs.iterator() - while (iterator.hasNext()) { - val output = iterator.next() - - // Remove the output from the outputs to prevent ConcurrentModificationException when removing it - // during the call to output.close() - iterator.remove() - - output.close() - } - } - } - - /** - * The total amount of remaining work. - */ - private val totalRemainingWork: Double - get() = consumer.remainingWork - - override fun addOutput(capacity: Double): SimResourceProvider { - check(!isClosed) { "Distributor has been closed" } - - val provider = OutputProvider(capacity) - _outputs.add(provider) - return provider - } - - override fun close() { - if (!isClosed) { - isClosed = true - input.cancel() - } - } - - init { - input.startConsumer(consumer) - } - - /** - * Indicate that the workloads should be re-scheduled. - */ - private fun schedule() { - input.interrupt() - } - - /** - * Schedule the work over the physical CPUs. - */ - private fun doSchedule(capacity: Double): SimResourceCommand { - // If there is no work yet, mark all inputs as idle. - if (outputContexts.isEmpty()) { - return SimResourceCommand.Idle() - } - - val maxUsage = capacity - var duration: Double = Double.MAX_VALUE - var deadline: Long = Long.MAX_VALUE - var availableSpeed = maxUsage - var totalRequestedSpeed = 0.0 - var totalRequestedWork = 0.0 - - // Flush the work of the outputs - var outputIterator = outputContexts.listIterator() - while (outputIterator.hasNext()) { - val output = outputIterator.next() - - output.flush(isIntermediate = true) - - if (output.activeCommand == SimResourceCommand.Exit) { - // Apparently the output consumer has exited, so remove it from the scheduling queue. - outputIterator.remove() - } - } - - // Sort the outputs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set - outputContexts.sort() - - // Divide the available input capacity fairly across the outputs using max-min fair sharing - outputIterator = outputContexts.listIterator() - var remaining = outputContexts.size - while (outputIterator.hasNext()) { - val output = outputIterator.next() - val availableShare = availableSpeed / remaining-- - - when (val command = output.activeCommand) { - is SimResourceCommand.Idle -> { - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, command.deadline) - - output.actualSpeed = 0.0 - } - is SimResourceCommand.Consume -> { - val grantedSpeed = min(output.allowedSpeed, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, command.deadline) - - // Ignore idle computation - if (grantedSpeed <= 0.0 || command.work <= 0.0) { - output.actualSpeed = 0.0 - continue - } - - totalRequestedSpeed += command.limit - totalRequestedWork += command.work - - output.actualSpeed = grantedSpeed - availableSpeed -= grantedSpeed - - // The duration that we want to run is that of the shortest request from an output - duration = min(duration, command.work / grantedSpeed) - } - SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" } - } - } - - assert(deadline >= clock.millis()) { "Deadline already passed" } - - this.totalRequestedSpeed = totalRequestedSpeed - this.totalRequestedWork = totalRequestedWork - this.totalAllocatedSpeed = maxUsage - availableSpeed - this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration) - - return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) - SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) - else - SimResourceCommand.Idle(deadline) - } - - /** - * Obtain the next command to perform. - */ - private fun doNext(capacity: Double): SimResourceCommand { - val totalRequestedWork = totalRequestedWork.toLong() - val totalRemainingWork = totalRemainingWork.toLong() - val totalAllocatedWork = totalAllocatedWork.toLong() - val totalRequestedSpeed = totalRequestedSpeed - val totalAllocatedSpeed = totalAllocatedSpeed - - // Force all inputs to re-schedule their work. - val command = doSchedule(capacity) - - // Report metrics - listener?.onSliceFinish( - this, - totalRequestedWork, - totalAllocatedWork - totalRemainingWork, - totalOvercommittedWork.toLong(), - totalInterferedWork.toLong(), - totalAllocatedSpeed, - totalRequestedSpeed - ) - - totalInterferedWork = 0.0 - totalOvercommittedWork = 0.0 - - return command - } - - /** - * Event listener for hypervisor events. - */ - public interface Listener { - /** - * This method is invoked when a slice is finished. - */ - public fun onSliceFinish( - switch: SimResourceDistributor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) - } - - /** - * An internal [SimResourceProvider] implementation for switch outputs. - */ - private inner class OutputProvider(val capacity: Double) : SimResourceProvider { - /** - * The [OutputContext] that is currently running. - */ - private var ctx: OutputContext? = null - - override var state: SimResourceState = SimResourceState.Pending - internal set - - override fun startConsumer(consumer: SimResourceConsumer) { - check(state == SimResourceState.Pending) { "Resource cannot be consumed" } - - val ctx = OutputContext(this, consumer) - this.ctx = ctx - this.state = SimResourceState.Active - outputContexts += ctx - - ctx.start() - schedule() - } - - override fun close() { - cancel() - - if (state != SimResourceState.Stopped) { - state = SimResourceState.Stopped - _outputs.remove(this) - } - } - - override fun interrupt() { - ctx?.interrupt() - } - - override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.stop() - } - - if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending - } - } - } - - /** - * A [SimAbstractResourceContext] for the output resources. - */ - private inner class OutputContext( - private val provider: OutputProvider, - consumer: SimResourceConsumer - ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> { - /** - * The current command that is processed by the vCPU. - */ - var activeCommand: SimResourceCommand = SimResourceCommand.Idle() - - /** - * The processing speed that is allowed by the model constraints. - */ - var allowedSpeed: Double = 0.0 - - /** - * The actual processing speed. - */ - var actualSpeed: Double = 0.0 - - private fun reportOvercommit() { - val remainingWork = remainingWork - totalOvercommittedWork += remainingWork - } - - override fun onIdle(deadline: Long) { - reportOvercommit() - - allowedSpeed = 0.0 - activeCommand = SimResourceCommand.Idle(deadline) - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - reportOvercommit() - - allowedSpeed = speed - activeCommand = SimResourceCommand.Consume(work, limit, deadline) - } - - override fun onFinish(cause: Throwable?) { - reportOvercommit() - - activeCommand = SimResourceCommand.Exit - provider.cancel() - - super.onFinish(cause) - } - - override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { - // Apply performance interference model - val performanceScore = 1.0 - - // Compute the remaining amount of work - return if (work > 0.0) { - // Compute the fraction of compute time allocated to the VM - val fraction = actualSpeed / totalAllocatedSpeed - - // Compute the work that was actually granted to the VM. - val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction - val processed = processingAvailable * performanceScore - - val interferedWork = processingAvailable - processed - - totalInterferedWork += interferedWork - - max(0.0, work - processed) - } else { - 0.0 - } - } - - override fun interrupt() { - // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead - // to infinite recursion. - if (isProcessing) { - return - } - - super.interrupt() - - // Force the scheduler to re-schedule - schedule() - } - - override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlow.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlow.kt deleted file mode 100644 index bbf6ad44..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlow.kt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * A [SimResourceFlow] acts as both a resource consumer and resource provider at the same time, simplifying bridging - * between different components. - */ -public interface SimResourceFlow : SimResourceConsumer, SimResourceProvider diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt deleted file mode 100644 index 52b13c5c..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import kotlinx.coroutines.suspendCancellableCoroutine - -/** - * A [SimResourceProvider] provides some resource of type [R]. - */ -public interface SimResourceProvider : AutoCloseable { - /** - * The state of the resource. - */ - public val state: SimResourceState - - /** - * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously. - * - * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended. - */ - public fun startConsumer(consumer: SimResourceConsumer) - - /** - * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op. - */ - public fun interrupt() - - /** - * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op. - */ - public fun cancel() - - /** - * End the lifetime of the resource. - * - * This operation terminates the existing resource consumer. - */ - public override fun close() -} - -/** - * Consume the resource provided by this provider using the specified [consumer] and suspend execution until - * the consumer has finished. - */ -public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) { - return suspendCancellableCoroutine { cont -> - startConsumer(object : SimResourceConsumer by consumer { - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - assert(!cont.isCompleted) { "Coroutine already completed" } - - consumer.onFinish(ctx, cause) - - cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit)) - } - - override fun toString(): String = "SimSuspendingResourceConsumer" - }) - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt deleted file mode 100644 index 025b0406..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.opendc.utils.TimerScheduler -import java.time.Clock -import kotlin.math.ceil -import kotlin.math.min - -/** - * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity. - * - * @param initialCapacity The initial capacity of the resource. - * @param clock The virtual clock to track simulation time. - * @param scheduler The scheduler to schedule the interrupts. - */ -public class SimResourceSource( - initialCapacity: Double, - private val clock: Clock, - private val scheduler: TimerScheduler<Any> -) : SimResourceProvider { - /** - * The current processing speed of the resource. - */ - public val speed: Double - get() = ctx?.speed ?: 0.0 - - /** - * The capacity of the resource. - */ - public var capacity: Double = initialCapacity - set(value) { - field = value - ctx?.capacity = value - } - - /** - * The [Context] that is currently running. - */ - private var ctx: Context? = null - - override var state: SimResourceState = SimResourceState.Pending - private set - - override fun startConsumer(consumer: SimResourceConsumer) { - check(state == SimResourceState.Pending) { "Resource is in invalid state" } - val ctx = Context(consumer) - - this.ctx = ctx - this.state = SimResourceState.Active - - ctx.start() - } - - override fun close() { - cancel() - state = SimResourceState.Stopped - } - - override fun interrupt() { - ctx?.interrupt() - } - - override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.stop() - } - - if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending - } - } - - /** - * Internal implementation of [SimResourceContext] for this class. - */ - private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) { - override fun onIdle(deadline: Long) { - // Do not resume if deadline is "infinite" - if (deadline != Long.MAX_VALUE) { - scheduler.startSingleTimerTo(this, deadline) { flush() } - } - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - val until = min(deadline, clock.millis() + getDuration(work, speed)) - - scheduler.startSingleTimerTo(this, until, ::flush) - } - - override fun onFinish(cause: Throwable?) { - scheduler.cancel(this) - cancel() - - super.onFinish(cause) - } - - override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" - } - - /** - * Compute the duration that a resource consumption will take with the specified [speed]. - */ - private fun getDuration(work: Double, speed: Double): Long { - return ceil(work / speed * 1000).toLong() - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt deleted file mode 100644 index c72951d0..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * The state of a resource provider. - */ -public enum class SimResourceState { - /** - * The resource provider is pending and the resource is waiting to be consumed. - */ - Pending, - - /** - * The resource provider is active and the resource is currently being consumed. - */ - Active, - - /** - * The resource provider is stopped and the resource cannot be consumed anymore. - */ - Stopped -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt deleted file mode 100644 index 53fec16a..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers. - */ -public interface SimResourceSwitch : AutoCloseable { - /** - * The output resource providers to which resource consumers can be attached. - */ - public val outputs: Set<SimResourceProvider> - - /** - * The input resources that will be switched between the output providers. - */ - public val inputs: Set<SimResourceProvider> - - /** - * Add an output to the switch with the specified [capacity]. - */ - public fun addOutput(capacity: Double): SimResourceProvider - - /** - * Add the specified [input] to the switch. - */ - public fun addInput(input: SimResourceProvider) -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt deleted file mode 100644 index 45e4c220..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import java.util.ArrayDeque - -/** - * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that - * a single output is directly connected to an input and that the switch can only support as much outputs as inputs. - */ -public class SimResourceSwitchExclusive : SimResourceSwitch { - /** - * A flag to indicate that the switch is closed. - */ - private var isClosed: Boolean = false - - private val _outputs = mutableSetOf<Provider>() - override val outputs: Set<SimResourceProvider> - get() = _outputs - - private val availableResources = ArrayDeque<SimResourceTransformer>() - - private val _inputs = mutableSetOf<SimResourceProvider>() - override val inputs: Set<SimResourceProvider> - get() = _inputs - - override fun addOutput(capacity: Double): SimResourceProvider { - check(!isClosed) { "Switch has been closed" } - check(availableResources.isNotEmpty()) { "No capacity to serve request" } - val forwarder = availableResources.poll() - val output = Provider(capacity, forwarder) - _outputs += output - return output - } - - override fun addInput(input: SimResourceProvider) { - check(!isClosed) { "Switch has been closed" } - - if (input in inputs) { - return - } - - val forwarder = SimResourceForwarder() - - _inputs += input - availableResources += forwarder - - input.startConsumer(object : SimResourceConsumer by forwarder { - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - // De-register the input after it has finished - _inputs -= input - forwarder.onFinish(ctx, cause) - } - }) - } - - override fun close() { - isClosed = true - - // Cancel all upstream subscriptions - _inputs.forEach(SimResourceProvider::cancel) - } - - private inner class Provider( - private val capacity: Double, - private val forwarder: SimResourceTransformer - ) : SimResourceProvider by forwarder { - override fun close() { - // We explicitly do not close the forwarder here in order to re-use it across output resources. - - _outputs -= this - availableResources += forwarder - } - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt deleted file mode 100644 index c796c251..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import kotlinx.coroutines.* -import java.time.Clock - -/** - * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min - * fair sharing. - */ -public class SimResourceSwitchMaxMin( - clock: Clock, - private val listener: Listener? = null -) : SimResourceSwitch { - private val _outputs = mutableSetOf<SimResourceProvider>() - override val outputs: Set<SimResourceProvider> - get() = _outputs - - private val _inputs = mutableSetOf<SimResourceProvider>() - override val inputs: Set<SimResourceProvider> - get() = _inputs - - /** - * A flag to indicate that the switch was closed. - */ - private var isClosed = false - - /** - * The aggregator to aggregate the resources. - */ - private val aggregator = SimResourceAggregatorMaxMin(clock) - - /** - * The distributor to distribute the aggregated resources. - */ - private val distributor = SimResourceDistributorMaxMin( - aggregator.output, clock, - object : SimResourceDistributorMaxMin.Listener { - override fun onSliceFinish( - switch: SimResourceDistributor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) - } - } - ) - - /** - * Add an output to the switch represented by [resource]. - */ - override fun addOutput(capacity: Double): SimResourceProvider { - check(!isClosed) { "Switch has been closed" } - - val provider = distributor.addOutput(capacity) - _outputs.add(provider) - return provider - } - - /** - * Add the specified [input] to the switch. - */ - override fun addInput(input: SimResourceProvider) { - check(!isClosed) { "Switch has been closed" } - - aggregator.addInput(input) - } - - override fun close() { - if (!isClosed) { - isClosed = true - distributor.close() - aggregator.close() - } - } - - /** - * Event listener for hypervisor events. - */ - public interface Listener { - /** - * This method is invoked when a slice is finished. - */ - public fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt deleted file mode 100644 index de455021..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider. - * - * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. - * @param transform The function to transform the received resource command. - */ -public class SimResourceTransformer( - private val isCoupled: Boolean = false, - private val transform: (SimResourceContext, SimResourceCommand) -> SimResourceCommand -) : SimResourceFlow { - /** - * The [SimResourceContext] in which the forwarder runs. - */ - private var ctx: SimResourceContext? = null - - /** - * The delegate [SimResourceConsumer]. - */ - private var delegate: SimResourceConsumer? = null - - /** - * A flag to indicate that the delegate was started. - */ - private var hasDelegateStarted: Boolean = false - - /** - * The state of the forwarder. - */ - override var state: SimResourceState = SimResourceState.Pending - private set - - override fun startConsumer(consumer: SimResourceConsumer) { - check(state == SimResourceState.Pending) { "Resource is in invalid state" } - - state = SimResourceState.Active - delegate = consumer - - // Interrupt the provider to replace the consumer - interrupt() - } - - override fun interrupt() { - ctx?.interrupt() - } - - override fun cancel() { - val delegate = delegate - val ctx = ctx - - state = SimResourceState.Pending - - if (delegate != null && ctx != null) { - this.delegate = null - delegate.onFinish(ctx) - } - } - - override fun close() { - val ctx = ctx - - state = SimResourceState.Stopped - - if (ctx != null) { - this.ctx = null - ctx.interrupt() - } - } - - override fun onStart(ctx: SimResourceContext) { - this.ctx = ctx - } - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val delegate = delegate - - if (!hasDelegateStarted) { - start() - } - - return if (state == SimResourceState.Stopped) { - SimResourceCommand.Exit - } else if (delegate != null) { - val command = transform(ctx, delegate.onNext(ctx)) - if (command == SimResourceCommand.Exit) { - // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we - // reset beforehand the existing state and check whether it has been updated afterwards - reset() - - delegate.onFinish(ctx) - - if (isCoupled || state == SimResourceState.Stopped) - SimResourceCommand.Exit - else - onNext(ctx) - } else { - command - } - } else { - SimResourceCommand.Idle() - } - } - - override fun onConfirm(ctx: SimResourceContext, speed: Double) { - delegate?.onConfirm(ctx, speed) - } - - override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { - delegate?.onCapacityChanged(ctx, isThrottled) - } - - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - this.ctx = null - - val delegate = delegate - if (delegate != null) { - reset() - delegate.onFinish(ctx, cause) - } - } - - /** - * Start the delegate. - */ - private fun start() { - val delegate = delegate ?: return - delegate.onStart(checkNotNull(ctx)) - - hasDelegateStarted = true - } - - /** - * Reset the delegate. - */ - private fun reset() { - delegate = null - hasDelegateStarted = false - - if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending - } - } -} - -/** - * Constructs a [SimResourceTransformer] that forwards the received resource command with an identity transform. - * - * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. - */ -public fun SimResourceForwarder(isCoupled: Boolean = false): SimResourceTransformer { - return SimResourceTransformer(isCoupled) { _, command -> command } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt deleted file mode 100644 index 52a42241..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources.consumer - -/** - * The [SimConsumerBarrier] is a barrier that allows consumers to wait for a select number of other consumers to - * complete, before proceeding its operation. - */ -public class SimConsumerBarrier(public val parties: Int) { - private var counter = 0 - - /** - * Enter the barrier and determine whether the caller is the last to reach the barrier. - * - * @return `true` if the caller is the last to reach the barrier, `false` otherwise. - */ - public fun enter(): Boolean { - val last = ++counter == parties - if (last) { - counter = 0 - return true - } - return false - } - - /** - * Reset the barrier. - */ - public fun reset() { - counter = 0 - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt deleted file mode 100644 index 114c7312..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources.consumer - -import org.opendc.simulator.resources.SimResourceCommand -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext -import kotlin.math.min - -/** - * Helper class to expose an observable [speed] field describing the speed of the consumer. - */ -public class SimSpeedConsumerAdapter( - private val delegate: SimResourceConsumer, - private val callback: (Double) -> Unit = {} -) : SimResourceConsumer by delegate { - /** - * The resource processing speed at this instant. - */ - public var speed: Double = 0.0 - private set(value) { - if (field != value) { - callback(value) - field = value - } - } - - init { - callback(0.0) - } - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return delegate.onNext(ctx) - } - - override fun onConfirm(ctx: SimResourceContext, speed: Double) { - delegate.onConfirm(ctx, speed) - - this.speed = speed - } - - override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { - val oldSpeed = speed - - delegate.onCapacityChanged(ctx, isThrottled) - - // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might - // need to update the current speed. - if (oldSpeed == speed) { - speed = min(ctx.capacity, speed) - } - } - - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - super.onFinish(ctx, cause) - - speed = 0.0 - } - - override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]" -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt deleted file mode 100644 index a52d1d5d..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources.consumer - -import org.opendc.simulator.resources.SimResourceCommand -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext - -/** - * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource - * consumption for some period of time. - */ -public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer { - private var iterator: Iterator<Fragment>? = null - - override fun onStart(ctx: SimResourceContext) { - check(iterator == null) { "Consumer already running" } - iterator = trace.iterator() - } - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val iterator = checkNotNull(iterator) - return if (iterator.hasNext()) { - val now = ctx.clock.millis() - val fragment = iterator.next() - val work = (fragment.duration / 1000) * fragment.usage - val deadline = now + fragment.duration - - assert(deadline >= now) { "Deadline already passed" } - - if (work > 0.0) - SimResourceCommand.Consume(work, fragment.usage, deadline) - else - SimResourceCommand.Idle(deadline) - } else { - SimResourceCommand.Exit - } - } - - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - iterator = null - } - - /** - * A fragment of the workload. - */ - public data class Fragment(val duration: Long, val usage: Double) -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt deleted file mode 100644 index faa693c4..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources.consumer - -import org.opendc.simulator.resources.SimResourceCommand -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext - -/** - * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization. - */ -public class SimWorkConsumer( - private val work: Double, - private val utilization: Double -) : SimResourceConsumer { - - init { - require(work >= 0.0) { "Work must be positive" } - require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } - } - - private var isFirst = true - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val limit = ctx.capacity * utilization - val work = if (isFirst) { - isFirst = false - work - } else { - ctx.remainingWork - } - return if (work > 0.0) { - SimResourceCommand.Consume(work, limit) - } else { - SimResourceCommand.Exit - } - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt deleted file mode 100644 index e272abb8..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import io.mockk.every -import io.mockk.mockk -import io.mockk.verify -import kotlinx.coroutines.* -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.utils.TimerScheduler - -/** - * Test suite for the [SimResourceAggregatorMaxMin] class. - */ -@OptIn(ExperimentalCoroutinesApi::class) -internal class SimResourceAggregatorMaxMinTest { - @Test - fun testSingleCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - - val aggregator = SimResourceAggregatorMaxMin(clock) - val forwarder = SimResourceForwarder() - val sources = listOf( - forwarder, - SimResourceSource(1.0, clock, scheduler) - ) - sources.forEach(aggregator::addInput) - - val consumer = SimWorkConsumer(1.0, 0.5) - val usage = mutableListOf<Double>() - val source = SimResourceSource(1.0, clock, scheduler) - val adapter = SimSpeedConsumerAdapter(forwarder, usage::add) - source.startConsumer(adapter) - - try { - aggregator.output.consume(consumer) - yield() - - assertAll( - { assertEquals(1000, clock.millis()) }, - { assertEquals(listOf(0.0, 0.5, 0.0), usage) } - ) - } finally { - aggregator.output.close() - } - } - - @Test - fun testDoubleCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - - val aggregator = SimResourceAggregatorMaxMin(clock) - val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) - ) - sources.forEach(aggregator::addInput) - - val consumer = SimWorkConsumer(2.0, 1.0) - val usage = mutableListOf<Double>() - val adapter = SimSpeedConsumerAdapter(consumer, usage::add) - - try { - aggregator.output.consume(adapter) - yield() - assertAll( - { assertEquals(1000, clock.millis()) }, - { assertEquals(listOf(0.0, 2.0, 0.0), usage) } - ) - } finally { - aggregator.output.close() - } - } - - @Test - fun testOvercommit() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - - val aggregator = SimResourceAggregatorMaxMin(clock) - val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) - ) - sources.forEach(aggregator::addInput) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) - .andThen(SimResourceCommand.Exit) - - try { - aggregator.output.consume(consumer) - yield() - assertEquals(1000, clock.millis()) - - verify(exactly = 2) { consumer.onNext(any()) } - } finally { - aggregator.output.close() - } - } - - @Test - fun testException() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - - val aggregator = SimResourceAggregatorMaxMin(clock) - val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) - ) - sources.forEach(aggregator::addInput) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1.0, 1.0)) - .andThenThrows(IllegalStateException()) - - try { - assertThrows<IllegalStateException> { aggregator.output.consume(consumer) } - yield() - assertEquals(SimResourceState.Pending, sources[0].state) - } finally { - aggregator.output.close() - } - } - - @Test - fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - - val aggregator = SimResourceAggregatorMaxMin(clock) - val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) - ) - sources.forEach(aggregator::addInput) - - val consumer = SimWorkConsumer(4.0, 1.0) - try { - coroutineScope { - launch { aggregator.output.consume(consumer) } - delay(1000) - sources[0].capacity = 0.5 - } - yield() - assertEquals(2334, clock.millis()) - } finally { - aggregator.output.close() - } - } - - @Test - fun testFailOverCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - - val aggregator = SimResourceAggregatorMaxMin(clock) - val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) - ) - sources.forEach(aggregator::addInput) - - val consumer = SimWorkConsumer(1.0, 0.5) - try { - coroutineScope { - launch { aggregator.output.consume(consumer) } - delay(500) - sources[0].capacity = 0.5 - } - yield() - assertEquals(1000, clock.millis()) - } finally { - aggregator.output.close() - } - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt deleted file mode 100644 index 02d456ff..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows - -/** - * Test suite for [SimResourceCommand]. - */ -class SimResourceCommandTest { - @Test - fun testZeroWork() { - assertThrows<IllegalArgumentException> { - SimResourceCommand.Consume(0.0, 1.0) - } - } - - @Test - fun testNegativeWork() { - assertThrows<IllegalArgumentException> { - SimResourceCommand.Consume(-1.0, 1.0) - } - } - - @Test - fun testZeroLimit() { - assertThrows<IllegalArgumentException> { - SimResourceCommand.Consume(1.0, 0.0) - } - } - - @Test - fun testNegativeLimit() { - assertThrows<IllegalArgumentException> { - SimResourceCommand.Consume(1.0, -1.0, 1) - } - } - - @Test - fun testConsumeCorrect() { - assertDoesNotThrow { - SimResourceCommand.Consume(1.0, 1.0) - } - } - - @Test - fun testIdleCorrect() { - assertDoesNotThrow { - SimResourceCommand.Idle(1) - } - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt deleted file mode 100644 index be909556..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import io.mockk.* -import kotlinx.coroutines.* -import org.junit.jupiter.api.* -import org.opendc.simulator.core.runBlockingSimulation - -/** - * A test suite for the [SimAbstractResourceContext] class. - */ -@OptIn(ExperimentalCoroutinesApi::class) -class SimResourceContextTest { - @Test - fun testFlushWithoutCommand() = runBlockingSimulation { - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - - val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { - override fun onIdle(deadline: Long) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish(cause: Throwable?) {} - } - - context.flush() - } - - @Test - fun testIntermediateFlush() = runBlockingSimulation { - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - - val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { - override fun onIdle(deadline: Long) {} - override fun onFinish(cause: Throwable?) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} - }) - - context.start() - delay(1) // Delay 1 ms to prevent hitting the fast path - context.flush(isIntermediate = true) - - verify(exactly = 2) { context.onConsume(any(), any(), any()) } - } - - @Test - fun testIntermediateFlushIdle() = runBlockingSimulation { - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - - val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { - override fun onIdle(deadline: Long) {} - override fun onFinish(cause: Throwable?) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} - }) - - context.start() - delay(5) - context.flush(isIntermediate = true) - delay(5) - context.flush(isIntermediate = true) - - assertAll( - { verify(exactly = 2) { context.onIdle(any()) } }, - { verify(exactly = 1) { context.onFinish(null) } } - ) - } - - @Test - fun testDoubleStart() = runBlockingSimulation { - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - - val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { - override fun onIdle(deadline: Long) {} - override fun onFinish(cause: Throwable?) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} - } - - context.start() - assertThrows<IllegalStateException> { context.start() } - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt deleted file mode 100644 index 39f74481..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ /dev/null @@ -1,351 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import io.mockk.every -import io.mockk.mockk -import io.mockk.spyk -import io.mockk.verify -import kotlinx.coroutines.* -import org.junit.jupiter.api.* -import org.junit.jupiter.api.Assertions.assertEquals -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.utils.TimerScheduler - -/** - * A test suite for the [SimResourceSource] class. - */ -@OptIn(ExperimentalCoroutinesApi::class) -class SimResourceSourceTest { - @Test - fun testSpeed() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1000 * capacity, capacity)) - .andThen(SimResourceCommand.Exit) - - try { - val res = mutableListOf<Double>() - val adapter = SimSpeedConsumerAdapter(consumer, res::add) - - provider.consume(adapter) - - assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } - } finally { - scheduler.close() - provider.close() - } - } - - @Test - fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(1.0, clock, scheduler) - - val consumer = spyk(SimWorkConsumer(2.0, 1.0)) - - try { - coroutineScope { - launch { provider.consume(consumer) } - delay(1000) - provider.capacity = 0.5 - } - assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } - } finally { - scheduler.close() - provider.close() - } - } - - @Test - fun testSpeedLimit() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1000 * capacity, 2 * capacity)) - .andThen(SimResourceCommand.Exit) - - try { - val res = mutableListOf<Double>() - val adapter = SimSpeedConsumerAdapter(consumer, res::add) - - provider.consume(adapter) - - assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } - } finally { - scheduler.close() - provider.close() - } - } - - /** - * Test to see whether no infinite recursion occurs when interrupting during [SimResourceConsumer.onStart] or - * [SimResourceConsumer.onNext]. - */ - @Test - fun testIntermediateInterrupt() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) - - val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext) { - ctx.interrupt() - } - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Exit - } - } - - try { - provider.consume(consumer) - } finally { - scheduler.close() - provider.close() - } - } - - @Test - fun testInterrupt() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) - lateinit var resCtx: SimResourceContext - - val consumer = object : SimResourceConsumer { - var isFirst = true - override fun onStart(ctx: SimResourceContext) { - resCtx = ctx - } - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - assertEquals(0.0, ctx.remainingWork) - return if (isFirst) { - isFirst = false - SimResourceCommand.Consume(4.0, 1.0) - } else { - SimResourceCommand.Exit - } - } - } - - try { - launch { - yield() - resCtx.interrupt() - } - provider.consume(consumer) - - assertEquals(0, clock.millis()) - } finally { - scheduler.close() - provider.close() - } - } - - @Test - fun testFailure() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onStart(any()) } - .throws(IllegalStateException()) - - try { - assertThrows<IllegalStateException> { - provider.consume(consumer) - } - } finally { - scheduler.close() - provider.close() - } - } - - @Test - fun testExceptionPropagationOnNext() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1.0, 1.0)) - .andThenThrows(IllegalStateException()) - - try { - assertThrows<IllegalStateException> { - provider.consume(consumer) - } - } finally { - scheduler.close() - provider.close() - } - } - - @Test - fun testConcurrentConsumption() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1.0, 1.0)) - .andThenThrows(IllegalStateException()) - - try { - assertThrows<IllegalStateException> { - coroutineScope { - launch { provider.consume(consumer) } - provider.consume(consumer) - } - } - } finally { - scheduler.close() - provider.close() - } - } - - @Test - fun testClosedConsumption() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1.0, 1.0)) - .andThenThrows(IllegalStateException()) - - try { - assertThrows<IllegalStateException> { - provider.close() - provider.consume(consumer) - } - } finally { - scheduler.close() - provider.close() - } - } - - @Test - fun testCloseDuringConsumption() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1.0, 1.0)) - .andThenThrows(IllegalStateException()) - - try { - launch { provider.consume(consumer) } - delay(500) - provider.close() - - assertEquals(500, clock.millis()) - } finally { - scheduler.close() - provider.close() - } - } - - @Test - fun testIdle() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Idle(clock.millis() + 500)) - .andThen(SimResourceCommand.Exit) - - try { - provider.consume(consumer) - - assertEquals(500, clock.millis()) - } finally { - scheduler.close() - provider.close() - } - } - - @Test - fun testInfiniteSleep() { - assertThrows<IllegalStateException> { - runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Idle()) - .andThenThrows(IllegalStateException()) - - try { - provider.consume(consumer) - } finally { - scheduler.close() - provider.close() - } - } - } - } - - @Test - fun testIncorrectDeadline() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Idle(2)) - .andThen(SimResourceCommand.Exit) - - try { - delay(10) - - assertThrows<IllegalArgumentException> { provider.consume(consumer) } - } finally { - scheduler.close() - provider.close() - } - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt deleted file mode 100644 index f7d17867..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import io.mockk.every -import io.mockk.mockk -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter -import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.utils.TimerScheduler - -/** - * Test suite for the [SimResourceSwitchExclusive] class. - */ -@OptIn(ExperimentalCoroutinesApi::class) -internal class SimResourceSwitchExclusiveTest { - /** - * Test a trace workload. - */ - @Test - fun testTrace() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - - val speed = mutableListOf<Double>() - - val duration = 5 * 60L - val workload = - SimTraceConsumer( - sequenceOf( - SimTraceConsumer.Fragment(duration * 1000, 28.0), - SimTraceConsumer.Fragment(duration * 1000, 3500.0), - SimTraceConsumer.Fragment(duration * 1000, 0.0), - SimTraceConsumer.Fragment(duration * 1000, 183.0) - ), - ) - - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, clock, scheduler) - val forwarder = SimResourceForwarder() - val adapter = SimSpeedConsumerAdapter(forwarder, speed::add) - source.startConsumer(adapter) - switch.addInput(forwarder) - - val provider = switch.addOutput(3200.0) - - try { - provider.consume(workload) - yield() - } finally { - provider.close() - } - - assertAll( - { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, - { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } - ) - } - - /** - * Test runtime workload on hypervisor. - */ - @Test - fun testRuntimeWorkload() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit - - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, clock, scheduler) - - switch.addInput(source) - - val provider = switch.addOutput(3200.0) - - try { - provider.consume(workload) - yield() - } finally { - provider.close() - } - assertEquals(duration, clock.millis()) { "Took enough time" } - } - - /** - * Test two workloads running sequentially. - */ - @Test - fun testTwoWorkloads() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = object : SimResourceConsumer { - var isFirst = true - - override fun onStart(ctx: SimResourceContext) { - isFirst = true - } - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return if (isFirst) { - isFirst = false - SimResourceCommand.Consume(duration / 1000.0, 1.0) - } else { - SimResourceCommand.Exit - } - } - } - - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, clock, scheduler) - - switch.addInput(source) - - val provider = switch.addOutput(3200.0) - - try { - provider.consume(workload) - yield() - provider.consume(workload) - } finally { - provider.close() - } - assertEquals(duration * 2, clock.millis()) { "Took enough time" } - } - - /** - * Test concurrent workloads on the machine. - */ - @Test - fun testConcurrentWorkloadFails() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit - - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, clock, scheduler) - - switch.addInput(source) - - switch.addOutput(3200.0) - assertThrows<IllegalStateException> { switch.addOutput(3200.0) } - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt deleted file mode 100644 index 7416f277..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import io.mockk.every -import io.mockk.mockk -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import org.junit.jupiter.api.* -import org.junit.jupiter.api.Assertions.assertEquals -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.utils.TimerScheduler - -/** - * Test suite for the [SimResourceSwitch] implementations - */ -@OptIn(ExperimentalCoroutinesApi::class) -internal class SimResourceSwitchMaxMinTest { - @Test - fun testSmoke() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val switch = SimResourceSwitchMaxMin(clock) - - val sources = List(2) { SimResourceSource(2000.0, clock, scheduler) } - sources.forEach { switch.addInput(it) } - - val provider = switch.addOutput(1000.0) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit - - try { - provider.consume(consumer) - yield() - } finally { - switch.close() - scheduler.close() - } - } - - /** - * Test overcommitting of resources via the hypervisor with a single VM. - */ - @Test - fun testOvercommittedSingle() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - - val listener = object : SimResourceSwitchMaxMin.Listener { - var totalRequestedWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - - override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - totalRequestedWork += requestedWork - totalGrantedWork += grantedWork - totalOvercommittedWork += overcommittedWork - } - } - - val duration = 5 * 60L - val workload = - SimTraceConsumer( - sequenceOf( - SimTraceConsumer.Fragment(duration * 1000, 28.0), - SimTraceConsumer.Fragment(duration * 1000, 3500.0), - SimTraceConsumer.Fragment(duration * 1000, 0.0), - SimTraceConsumer.Fragment(duration * 1000, 183.0) - ), - ) - - val switch = SimResourceSwitchMaxMin(clock, listener) - val provider = switch.addOutput(3200.0) - - try { - switch.addInput(SimResourceSource(3200.0, clock, scheduler)) - provider.consume(workload) - yield() - } finally { - switch.close() - scheduler.close() - } - - assertAll( - { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, - { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, - { assertEquals(1200000, clock.millis()) } - ) - } - - /** - * Test overcommitting of resources via the hypervisor with two VMs. - */ - @Test - fun testOvercommittedDual() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - - val listener = object : SimResourceSwitchMaxMin.Listener { - var totalRequestedWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - - override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - totalRequestedWork += requestedWork - totalGrantedWork += grantedWork - totalOvercommittedWork += overcommittedWork - } - } - - val duration = 5 * 60L - val workloadA = - SimTraceConsumer( - sequenceOf( - SimTraceConsumer.Fragment(duration * 1000, 28.0), - SimTraceConsumer.Fragment(duration * 1000, 3500.0), - SimTraceConsumer.Fragment(duration * 1000, 0.0), - SimTraceConsumer.Fragment(duration * 1000, 183.0) - ), - ) - val workloadB = - SimTraceConsumer( - sequenceOf( - SimTraceConsumer.Fragment(duration * 1000, 28.0), - SimTraceConsumer.Fragment(duration * 1000, 3100.0), - SimTraceConsumer.Fragment(duration * 1000, 0.0), - SimTraceConsumer.Fragment(duration * 1000, 73.0) - ) - ) - - val switch = SimResourceSwitchMaxMin(clock, listener) - val providerA = switch.addOutput(3200.0) - val providerB = switch.addOutput(3200.0) - - try { - switch.addInput(SimResourceSource(3200.0, clock, scheduler)) - - coroutineScope { - launch { providerA.consume(workloadA) } - providerB.consume(workloadB) - } - - yield() - } finally { - switch.close() - scheduler.close() - } - assertAll( - { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") }, - { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, - { assertEquals(1200000, clock.millis()) } - ) - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt deleted file mode 100644 index d2ad73bc..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import io.mockk.every -import io.mockk.mockk -import io.mockk.spyk -import io.mockk.verify -import kotlinx.coroutines.* -import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.utils.TimerScheduler - -/** - * A test suite for the [SimResourceTransformer] class. - */ -@OptIn(ExperimentalCoroutinesApi::class) -internal class SimResourceTransformerTest { - @Test - fun testExitImmediately() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) - - launch { - source.consume(forwarder) - source.close() - } - - forwarder.consume(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return SimResourceCommand.Exit - } - }) - - forwarder.close() - scheduler.close() - } - - @Test - fun testExit() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) - - launch { - source.consume(forwarder) - source.close() - } - - forwarder.consume(object : SimResourceConsumer { - var isFirst = true - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return if (isFirst) { - isFirst = false - SimResourceCommand.Consume(10.0, 1.0) - } else { - SimResourceCommand.Exit - } - } - }) - - forwarder.close() - } - - @Test - fun testState() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit - } - - assertEquals(SimResourceState.Pending, forwarder.state) - - forwarder.startConsumer(consumer) - assertEquals(SimResourceState.Active, forwarder.state) - - assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) } - - forwarder.cancel() - assertEquals(SimResourceState.Pending, forwarder.state) - - forwarder.close() - assertEquals(SimResourceState.Stopped, forwarder.state) - } - - @Test - fun testCancelPendingDelegate() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Exit - - forwarder.startConsumer(consumer) - forwarder.cancel() - - verify(exactly = 0) { consumer.onFinish(any(), null) } - } - - @Test - fun testCancelStartedDelegate() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) - - source.startConsumer(forwarder) - yield() - forwarder.startConsumer(consumer) - yield() - forwarder.cancel() - - verify(exactly = 1) { consumer.onStart(any()) } - verify(exactly = 1) { consumer.onFinish(any(), null) } - } - - @Test - fun testCancelPropagation() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) - - source.startConsumer(forwarder) - yield() - forwarder.startConsumer(consumer) - yield() - source.cancel() - - verify(exactly = 1) { consumer.onStart(any()) } - verify(exactly = 1) { consumer.onFinish(any(), null) } - } - - @Test - fun testExitPropagation() = runBlockingSimulation { - val forwarder = SimResourceForwarder(isCoupled = true) - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } returns SimResourceCommand.Exit - - source.startConsumer(forwarder) - forwarder.consume(consumer) - yield() - - assertEquals(SimResourceState.Pending, source.state) - } - - @Test - fun testAdjustCapacity() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(1.0, clock, scheduler) - - val consumer = spyk(SimWorkConsumer(2.0, 1.0)) - source.startConsumer(forwarder) - - coroutineScope { - launch { forwarder.consume(consumer) } - delay(1000) - source.capacity = 0.5 - } - - assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } - } - - @Test - fun testTransformExit() = runBlockingSimulation { - val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit } - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(1.0, clock, scheduler) - - val consumer = spyk(SimWorkConsumer(2.0, 1.0)) - source.startConsumer(forwarder) - forwarder.consume(consumer) - - assertEquals(0, clock.millis()) - verify(exactly = 1) { consumer.onNext(any()) } - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt deleted file mode 100644 index bf58b1b6..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.utils.TimerScheduler - -/** - * A test suite for the [SimWorkConsumer] class. - */ -@OptIn(ExperimentalCoroutinesApi::class) -internal class SimWorkConsumerTest { - @Test - fun testSmoke() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(1.0, clock, scheduler) - - val consumer = SimWorkConsumer(1.0, 1.0) - - try { - provider.consume(consumer) - assertEquals(1000, clock.millis()) - } finally { - provider.close() - } - } - - @Test - fun testUtilization() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(1.0, clock, scheduler) - - val consumer = SimWorkConsumer(1.0, 0.5) - - try { - provider.consume(consumer) - assertEquals(2000, clock.millis()) - } finally { - provider.close() - } - } -} |
