diff options
20 files changed, 424 insertions, 941 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index fdb3f1dc..a8b8afe9 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -47,7 +47,6 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.compute.power.SimplePowerDriver -import org.opendc.simulator.resources.SimResourceDistributorMaxMin import org.opendc.simulator.resources.SimResourceInterpreter import java.util.* import kotlin.coroutines.CoroutineContext @@ -418,7 +417,7 @@ public class SimHost( val counters = hypervisor.counters val grantedWork = counters.actual val overcommittedWork = counters.overcommit - val interferedWork = (counters as? SimResourceDistributorMaxMin.Counters)?.interference ?: 0.0 + val interferedWork = counters.interference _totalTime += (duration / 1000.0) * coreCount val activeTime = (grantedWork * d).roundToLong() diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 16085d82..1bec2de5 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -212,7 +212,7 @@ class CapelinIntegrationTest { { assertEquals(6013899, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(14724501, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(476163, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(477279, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index 98271fb0..cf9e3230 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -131,7 +131,7 @@ public abstract class SimAbstractHypervisor( /** * The vCPUs of the machine. */ - override val cpus = model.cpus.map { VCpu(switch.newOutput(interferenceKey), it) } + override val cpus = model.cpus.map { VCpu(switch, switch.newOutput(interferenceKey), it) } override fun close() { super.close() @@ -153,9 +153,10 @@ public abstract class SimAbstractHypervisor( * A [SimProcessingUnit] of a virtual machine. */ private class VCpu( - private val source: SimResourceCloseableProvider, + private val switch: SimResourceSwitch, + private val source: SimResourceProvider, override val model: ProcessingUnit - ) : SimProcessingUnit, SimResourceCloseableProvider by source { + ) : SimProcessingUnit, SimResourceProvider by source { override var capacity: Double get() = source.capacity set(_) { @@ -163,6 +164,13 @@ public abstract class SimAbstractHypervisor( } override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]" + + /** + * Close the CPU + */ + fun close() { + switch.removeOutput(source) + } } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt index 1f010338..8cd535ad 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt @@ -103,8 +103,10 @@ internal class SimHypervisorTest { println("Hypervisor finished") } yield() + val vm = hypervisor.createMachine(model) vm.run(workloadA) + yield() machine.close() diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt index 05daaa5c..2267715e 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt @@ -70,7 +70,7 @@ public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimN override fun close() { isClosed = true - _provider.close() + switch.removeOutput(_provider) _ports.remove(this) } } diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt index e5fcd938..947f6cb2 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt @@ -49,7 +49,7 @@ public class SimPdu( /** * Create a new PDU outlet. */ - public fun newOutlet(): Outlet = Outlet(switch.newOutput()) + public fun newOutlet(): Outlet = Outlet(switch, switch.newOutput()) init { switch.addInput(forwarder) @@ -81,7 +81,7 @@ public class SimPdu( /** * A PDU outlet. */ - public class Outlet(private val provider: SimResourceCloseableProvider) : SimPowerOutlet(), AutoCloseable { + public class Outlet(private val switch: SimResourceSwitch, private val provider: SimResourceProvider) : SimPowerOutlet(), AutoCloseable { override fun onConnect(inlet: SimPowerInlet) { provider.startConsumer(inlet.createConsumer()) } @@ -94,7 +94,7 @@ public class SimPdu( * Remove the outlet from the PDU. */ override fun close() { - provider.close() + switch.removeOutput(provider) } override fun toString(): String = "SimPdu.Outlet" diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt deleted file mode 100644 index 621ea6e7..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * Abstract implementation of [SimResourceAggregator]. - */ -public abstract class SimAbstractResourceAggregator( - interpreter: SimResourceInterpreter, - parent: SimResourceSystem? -) : SimResourceAggregator { - /** - * This method is invoked when the resource consumer consumes resources. - */ - protected abstract fun doConsume(limit: Double) - - /** - * This method is invoked when the resource consumer finishes processing. - */ - protected abstract fun doFinish() - - /** - * This method is invoked when an input context is started. - */ - protected abstract fun onInputStarted(input: SimResourceContext) - - /** - * This method is invoked when an input is stopped. - */ - protected abstract fun onInputFinished(input: SimResourceContext) - - /* SimResourceAggregator */ - override fun addInput(input: SimResourceProvider) { - val consumer = Consumer() - _inputs.add(input) - _inputConsumers.add(consumer) - input.startConsumer(consumer) - } - - override val inputs: Set<SimResourceProvider> - get() = _inputs - private val _inputs = mutableSetOf<SimResourceProvider>() - private val _inputConsumers = mutableListOf<Consumer>() - - /* SimResourceProvider */ - override val isActive: Boolean - get() = _output.isActive - - override val capacity: Double - get() = _output.capacity - - override val speed: Double - get() = _output.speed - - override val demand: Double - get() = _output.demand - - override val counters: SimResourceCounters - get() = _output.counters - - override fun startConsumer(consumer: SimResourceConsumer) { - _output.startConsumer(consumer) - } - - override fun cancel() { - _output.cancel() - } - - override fun interrupt() { - _output.interrupt() - } - - private val _output = object : SimAbstractResourceProvider(interpreter, initialCapacity = 0.0) { - override fun createLogic(): SimResourceProviderLogic { - return object : SimResourceProviderLogic { - - override fun onConsume(ctx: SimResourceControllableContext, now: Long, delta: Long, limit: Double, duration: Long) { - updateCounters(ctx, delta) - doConsume(limit) - } - - override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) { - updateCounters(ctx, delta) - doFinish() - } - - override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) { - parent?.onConverge(now) - } - } - } - } - - /** - * An internal [SimResourceConsumer] implementation for aggregator inputs. - */ - private inner class Consumer : SimResourceConsumer { - /** - * The resource context associated with the input. - */ - private var _ctx: SimResourceContext? = null - - private fun updateCapacity() { - // Adjust capacity of output resource - _output.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 } - } - - /* SimResourceConsumer */ - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return Long.MAX_VALUE - } - - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - when (event) { - SimResourceEvent.Start -> { - _ctx = ctx - updateCapacity() - - onInputStarted(ctx) - } - SimResourceEvent.Capacity -> updateCapacity() - SimResourceEvent.Exit -> onInputFinished(ctx) - else -> {} - } - } - } -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt deleted file mode 100644 index 00972f43..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource. - */ -public interface SimResourceAggregator : SimResourceProvider { - /** - * The input resources that will be switched between the output providers. - */ - public val inputs: Set<SimResourceProvider> - - /** - * Add the specified [input] to the aggregator. - */ - public fun addInput(input: SimResourceProvider) -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt deleted file mode 100644 index f131ac6c..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * A [SimResourceAggregator] that distributes the load equally across the input resources. - */ -public class SimResourceAggregatorMaxMin( - interpreter: SimResourceInterpreter, - parent: SimResourceSystem? = null -) : SimAbstractResourceAggregator(interpreter, parent) { - private val consumers = mutableListOf<SimResourceContext>() - - override fun doConsume(limit: Double) { - // Sort all consumers by their capacity - consumers.sortWith(compareBy { it.capacity }) - - // Divide the requests over the available capacity of the input resources fairly - for (input in consumers) { - val inputCapacity = input.capacity - val fraction = inputCapacity / capacity - val grantedSpeed = limit * fraction - - input.push(grantedSpeed) - input.interrupt() - } - } - - override fun doFinish() { - val iterator = consumers.iterator() - for (input in iterator) { - iterator.remove() - input.close() - } - } - - override fun onInputStarted(input: SimResourceContext) { - consumers.add(input) - } - - override fun onInputFinished(input: SimResourceContext) { - consumers.remove(input) - } -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt deleted file mode 100644 index bce8274b..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * A [SimResourceProvider] that has a controllable and limited lifetime. - * - * This interface is used to signal that the resource provider may be closed and not reused after that point. - */ -public interface SimResourceCloseableProvider : SimResourceProvider, AutoCloseable { - /** - * End the lifetime of the resource provider. - * - * This operation cancels the existing resource consumer and prevents the resource provider from being reused. - */ - public override fun close() -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt index 725aa5bc..11924db2 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt @@ -42,6 +42,11 @@ public interface SimResourceCounters { public val overcommit: Double /** + * The amount of work lost due to interference. + */ + public val interference: Double + + /** * Reset the resource counters. */ public fun reset() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt deleted file mode 100644 index f384582f..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.opendc.simulator.resources.interference.InterferenceKey - -/** - * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers. - */ -public interface SimResourceDistributor : SimResourceConsumer { - /** - * The output resource providers to which resource consumers can be attached. - */ - public val outputs: Set<SimResourceCloseableProvider> - - /** - * Create a new output for the distributor. - * - * @param key The key of the interference member to which the output belongs. - */ - public fun newOutput(key: InterferenceKey? = null): SimResourceCloseableProvider -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt deleted file mode 100644 index 7df940ad..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.opendc.simulator.resources.interference.InterferenceDomain -import org.opendc.simulator.resources.interference.InterferenceKey -import kotlin.math.max -import kotlin.math.min - -/** - * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing. - * - * @param interpreter The interpreter for managing the resource contexts. - * @param parent The parent resource system of the distributor. - * @param interferenceDomain The interference domain of the distributor. - */ -public class SimResourceDistributorMaxMin( - private val interpreter: SimResourceInterpreter, - private val parent: SimResourceSystem? = null, - private val interferenceDomain: InterferenceDomain? = null -) : SimResourceDistributor { - override val outputs: Set<SimResourceCloseableProvider> - get() = _outputs - private val _outputs = mutableSetOf<Output>() - - /** - * The resource context of the consumer. - */ - private var ctx: SimResourceContext? = null - - /** - * The active outputs. - */ - private val activeOutputs: MutableList<Output> = mutableListOf() - - /** - * The total allocated speed for the output resources. - */ - private var totalAllocatedSpeed = 0.0 - - /** - * The total requested speed for the output resources. - */ - private var totalRequestedSpeed = 0.0 - - /** - * The resource counters of this distributor. - */ - public val counters: Counters - get() = _counters - private val _counters = object : Counters { - override var demand: Double = 0.0 - override var actual: Double = 0.0 - override var overcommit: Double = 0.0 - override var interference: Double = 0.0 - - override fun reset() { - demand = 0.0 - actual = 0.0 - overcommit = 0.0 - interference = 0.0 - } - - override fun toString(): String = "SimResourceDistributorMaxMin.Counters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]" - } - - /* SimResourceDistributor */ - override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider { - val provider = Output(ctx?.capacity ?: 0.0, key) - _outputs.add(provider) - return provider - } - - /* SimResourceConsumer */ - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - // If there is no work yet, mark the input as idle. - if (activeOutputs.isEmpty()) { - return Long.MAX_VALUE - } - - val capacity = ctx.capacity - var duration: Long = Long.MAX_VALUE - var availableSpeed = capacity - var totalRequestedSpeed = 0.0 - - // Pull in the work of the outputs - val outputIterator = activeOutputs.listIterator() - for (output in outputIterator) { - output.pull(now) - - // Remove outputs that have finished - if (!output.isActive) { - outputIterator.remove() - } - } - - // Sort in-place the outputs based on their requested usage. - // Profiling shows that it is faster than maintaining some kind of sorted set. - activeOutputs.sort() - - // Divide the available input capacity fairly across the outputs using max-min fair sharing - var remaining = activeOutputs.size - for (output in activeOutputs) { - val availableShare = availableSpeed / remaining-- - val grantedSpeed = min(output.allowedSpeed, availableShare) - - duration = min(duration, output.duration) - - // Ignore idle computation - if (grantedSpeed <= 0.0) { - output.actualSpeed = 0.0 - continue - } - - totalRequestedSpeed += output.limit - - output.actualSpeed = grantedSpeed - availableSpeed -= grantedSpeed - } - - val durationS = duration / 1000.0 - var totalRequestedWork = 0.0 - var totalAllocatedWork = 0.0 - for (output in activeOutputs) { - val limit = output.limit - val speed = output.actualSpeed - if (speed > 0.0) { - totalRequestedWork += limit * durationS - totalAllocatedWork += speed * durationS - } - } - - this.totalRequestedSpeed = totalRequestedSpeed - val totalAllocatedSpeed = capacity - availableSpeed - this.totalAllocatedSpeed = totalAllocatedSpeed - - ctx.push(totalAllocatedSpeed) - return duration - } - - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - when (event) { - SimResourceEvent.Start -> { - this.ctx = ctx - updateCapacity(ctx) - } - SimResourceEvent.Exit -> { - val iterator = _outputs.iterator() - while (iterator.hasNext()) { - val output = iterator.next() - - // Remove the output from the outputs to prevent ConcurrentModificationException when removing it - // during the call to output.close() - iterator.remove() - - output.close() - } - } - SimResourceEvent.Capacity -> updateCapacity(ctx) - else -> {} - } - } - - /** - * Extended [SimResourceCounters] interface for the distributor. - */ - public interface Counters : SimResourceCounters { - /** - * The amount of work lost due to interference. - */ - public val interference: Double - } - - private fun updateCapacity(ctx: SimResourceContext) { - for (output in _outputs) { - output.capacity = ctx.capacity - } - } - - /** - * An internal [SimResourceProvider] implementation for switch outputs. - */ - private inner class Output(capacity: Double, val key: InterferenceKey?) : - SimAbstractResourceProvider(interpreter, capacity), - SimResourceCloseableProvider, - SimResourceProviderLogic, - Comparable<Output> { - /** - * A flag to indicate that the output is closed. - */ - private var isClosed: Boolean = false - - /** - * The requested limit. - */ - @JvmField var limit: Double = 0.0 - - /** - * The current deadline. - */ - @JvmField var duration: Long = Long.MAX_VALUE - - /** - * The processing speed that is allowed by the model constraints. - */ - @JvmField var allowedSpeed: Double = 0.0 - - /** - * The actual processing speed. - */ - @JvmField var actualSpeed: Double = 0.0 - - /** - * The timestamp at which we received the last command. - */ - private var lastCommandTimestamp: Long = Long.MIN_VALUE - - /* SimAbstractResourceProvider */ - override fun createLogic(): SimResourceProviderLogic = this - - override fun start(ctx: SimResourceControllableContext) { - check(!isClosed) { "Cannot re-use closed output" } - - activeOutputs += this - interpreter.batch { - ctx.start() - // Interrupt the input to re-schedule the resources - this@SimResourceDistributorMaxMin.ctx?.interrupt() - } - } - - override fun close() { - isClosed = true - cancel() - _outputs.remove(this) - } - - /* SimResourceProviderLogic */ - override fun onConsume( - ctx: SimResourceControllableContext, - now: Long, - delta: Long, - limit: Double, - duration: Long - ) { - doUpdateCounters(delta) - - allowedSpeed = min(ctx.capacity, limit) - actualSpeed = 0.0 - this.limit = limit - this.duration = duration - lastCommandTimestamp = now - } - - override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) { - parent?.onConverge(now) - } - - override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) { - doUpdateCounters(delta) - - limit = 0.0 - duration = Long.MAX_VALUE - actualSpeed = 0.0 - allowedSpeed = 0.0 - lastCommandTimestamp = now - } - - /* Comparable */ - override fun compareTo(other: Output): Int = allowedSpeed.compareTo(other.allowedSpeed) - - /** - * Pull the next command if necessary. - */ - fun pull(now: Long) { - val ctx = ctx - if (ctx != null && lastCommandTimestamp < now) { - ctx.flush() - } - } - - /** - * Helper method to update the resource counters of the distributor. - */ - private fun doUpdateCounters(delta: Long) { - if (delta <= 0L) { - return - } - - // Compute the performance penalty due to resource interference - val perfScore = if (interferenceDomain != null) { - val load = totalAllocatedSpeed / requireNotNull(this@SimResourceDistributorMaxMin.ctx).capacity - interferenceDomain.apply(key, load) - } else { - 1.0 - } - - val deltaS = delta / 1000.0 - val work = limit * deltaS - val actualWork = actualSpeed * deltaS - val remainingWork = work - actualWork - - updateCounters(work, actualWork, remainingWork) - - val distCounters = _counters - distCounters.demand += work - distCounters.actual += actualWork - distCounters.overcommit += remainingWork - distCounters.interference += actualWork * max(0.0, 1 - perfScore) - } - } -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt index d2aab634..3c25b76d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -27,11 +27,11 @@ import org.opendc.simulator.resources.interference.InterferenceKey /** * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers. */ -public interface SimResourceSwitch : AutoCloseable { +public interface SimResourceSwitch { /** * The output resource providers to which resource consumers can be attached. */ - public val outputs: Set<SimResourceCloseableProvider> + public val outputs: Set<SimResourceProvider> /** * The input resources that will be switched between the output providers. @@ -48,10 +48,20 @@ public interface SimResourceSwitch : AutoCloseable { * * @param key The key of the interference member to which the output belongs. */ - public fun newOutput(key: InterferenceKey? = null): SimResourceCloseableProvider + public fun newOutput(key: InterferenceKey? = null): SimResourceProvider + + /** + * Remove [output] from this switch. + */ + public fun removeOutput(output: SimResourceProvider) /** * Add the specified [input] to the switch. */ public fun addInput(input: SimResourceProvider) + + /** + * Clear all inputs and outputs from the switch. + */ + public fun clear() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index fbb541e5..2be8ccb0 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -27,23 +27,17 @@ import java.util.ArrayDeque /** * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that - * a single output is directly connected to an input and that the switch can only support as much outputs as inputs. + * a single output is directly connected to an input and that the switch can only support as many outputs as inputs. */ public class SimResourceSwitchExclusive : SimResourceSwitch { - /** - * A flag to indicate that the switch is closed. - */ - private var isClosed: Boolean = false - - private val _outputs = mutableSetOf<Provider>() - override val outputs: Set<SimResourceCloseableProvider> + override val outputs: Set<SimResourceProvider> get() = _outputs - - private val availableResources = ArrayDeque<SimResourceTransformer>() + private val _outputs = mutableSetOf<Output>() private val _inputs = mutableSetOf<SimResourceProvider>() override val inputs: Set<SimResourceProvider> get() = _inputs + private val _availableInputs = ArrayDeque<SimResourceTransformer>() override val counters: SimResourceCounters = object : SimResourceCounters { override val demand: Double @@ -52,6 +46,8 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { get() = _inputs.sumOf { it.counters.actual } override val overcommit: Double get() = _inputs.sumOf { it.counters.overcommit } + override val interference: Double + get() = _inputs.sumOf { it.counters.interference } override fun reset() { for (input in _inputs) { @@ -65,18 +61,25 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { /** * Add an output to the switch. */ - override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider { - check(!isClosed) { "Switch has been closed" } - check(availableResources.isNotEmpty()) { "No capacity to serve request" } - val forwarder = availableResources.poll() - val output = Provider(forwarder) + override fun newOutput(key: InterferenceKey?): SimResourceProvider { + val forwarder = checkNotNull(_availableInputs.poll()) { "No capacity to serve request" } + val output = Output(forwarder) _outputs += output return output } - override fun addInput(input: SimResourceProvider) { - check(!isClosed) { "Switch has been closed" } + override fun removeOutput(output: SimResourceProvider) { + if (!_outputs.remove(output)) { + return + } + + (output as Output).close() + } + /** + * Add an input to the switch. + */ + override fun addInput(input: SimResourceProvider) { if (input in inputs) { return } @@ -84,7 +87,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { val forwarder = SimResourceForwarder() _inputs += input - availableResources += forwarder + _availableInputs += forwarder input.startConsumer(object : SimResourceConsumer by forwarder { override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { @@ -98,18 +101,29 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { }) } - override fun close() { - isClosed = true + override fun clear() { + for (input in _inputs) { + input.cancel() + } + _inputs.clear() - // Cancel all upstream subscriptions - _inputs.forEach(SimResourceProvider::cancel) + // Outputs are implicitly cancelled by the inputs forwarders + _outputs.clear() } - private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceCloseableProvider, SimResourceProvider by forwarder { - override fun close() { + /** + * An output of the resource switch. + */ + private inner class Output(private val forwarder: SimResourceTransformer) : SimResourceProvider by forwarder { + /** + * Close the output. + */ + fun close() { // We explicitly do not close the forwarder here in order to re-use it across output resources. _outputs -= this - availableResources += forwarder + _availableInputs += forwarder } + + override fun toString(): String = "SimResourceSwitchExclusive.Output" } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index e368609f..574fb443 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -22,8 +22,11 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.impl.SimResourceCountersImpl import org.opendc.simulator.resources.interference.InterferenceDomain import org.opendc.simulator.resources.interference.InterferenceKey +import kotlin.math.max +import kotlin.math.min /** * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min @@ -34,69 +37,371 @@ import org.opendc.simulator.resources.interference.InterferenceKey * @param interferenceDomain The interference domain of the switch. */ public class SimResourceSwitchMaxMin( - interpreter: SimResourceInterpreter, - parent: SimResourceSystem? = null, - interferenceDomain: InterferenceDomain? = null + private val interpreter: SimResourceInterpreter, + private val parent: SimResourceSystem? = null, + private val interferenceDomain: InterferenceDomain? = null ) : SimResourceSwitch { /** * The output resource providers to which resource consumers can be attached. */ - override val outputs: Set<SimResourceCloseableProvider> - get() = distributor.outputs + override val outputs: Set<SimResourceProvider> + get() = _outputs + private val _outputs = mutableSetOf<Output>() + private val _activeOutputs: MutableList<Output> = mutableListOf() /** * The input resources that will be switched between the output providers. */ override val inputs: Set<SimResourceProvider> - get() = aggregator.inputs + get() = _inputs + private val _inputs = mutableSetOf<SimResourceProvider>() + private val _activeInputs = mutableListOf<Input>() /** - * The resource counters to track the execution metrics of all switch resources. + * The resource counters of this switch. */ - override val counters: SimResourceDistributorMaxMin.Counters - get() = distributor.counters + public override val counters: SimResourceCounters + get() = _counters + private val _counters = SimResourceCountersImpl() /** - * A flag to indicate that the switch was closed. + * The actual processing rate of the switch. */ - private var isClosed = false + private var _rate = 0.0 /** - * The aggregator to aggregate the resources. + * The demanded processing rate of the outputs. */ - private val aggregator = SimResourceAggregatorMaxMin(interpreter, parent) + private var _demand = 0.0 /** - * The distributor to distribute the aggregated resources. + * The capacity of the switch. */ - private val distributor = SimResourceDistributorMaxMin(interpreter, parent, interferenceDomain) + private var _capacity = 0.0 - init { - aggregator.startConsumer(distributor) - } + /** + * Flag to indicate that the scheduler is active. + */ + private var _schedulerActive = false /** * Add an output to the switch. */ - override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider { - check(!isClosed) { "Switch has been closed" } - - return distributor.newOutput(key) + override fun newOutput(key: InterferenceKey?): SimResourceProvider { + val provider = Output(_capacity, key) + _outputs.add(provider) + return provider } /** * Add the specified [input] to the switch. */ override fun addInput(input: SimResourceProvider) { - check(!isClosed) { "Switch has been closed" } + val consumer = Input(input) + if (_inputs.add(input)) { + _activeInputs.add(consumer) + input.startConsumer(consumer) + } + } + + /** + * Remove [output] from this switch. + */ + override fun removeOutput(output: SimResourceProvider) { + if (!_outputs.remove(output)) { + return + } + // This cast should always succeed since only `Output` instances should be added to _outputs + (output as Output).close() + } + + override fun clear() { + for (input in _activeInputs) { + input.cancel() + } + _activeInputs.clear() + + for (output in _activeOutputs) { + output.cancel() + } + _activeOutputs.clear() + } + + /** + * Run the scheduler of the switch. + */ + private fun runScheduler(now: Long) { + if (_schedulerActive) { + return + } + + _schedulerActive = true + try { + doSchedule(now) + } finally { + _schedulerActive = false + } + } + + /** + * Schedule the outputs over the input. + */ + private fun doSchedule(now: Long) { + // If there is no work yet, mark the input as idle. + if (_activeOutputs.isEmpty()) { + return + } + + val capacity = _capacity + var availableCapacity = capacity + + // Pull in the work of the outputs + val outputIterator = _activeOutputs.listIterator() + for (output in outputIterator) { + output.pull(now) + + // Remove outputs that have finished + if (!output.isActive) { + outputIterator.remove() + } + } + + var demand = 0.0 + + // Sort in-place the outputs based on their requested usage. + // Profiling shows that it is faster than maintaining some kind of sorted set. + _activeOutputs.sort() + + // Divide the available input capacity fairly across the outputs using max-min fair sharing + var remaining = _activeOutputs.size + for (output in _activeOutputs) { + val availableShare = availableCapacity / remaining-- + val grantedSpeed = min(output.allowedRate, availableShare) + + // Ignore idle computation + if (grantedSpeed <= 0.0) { + output.actualRate = 0.0 + continue + } + + demand += output.limit - aggregator.addInput(input) + output.actualRate = grantedSpeed + availableCapacity -= grantedSpeed + } + + val rate = capacity - availableCapacity + + _demand = demand + _rate = rate + + // Sort all consumers by their capacity + _activeInputs.sort() + + // Divide the requests over the available capacity of the input resources fairly + for (input in _activeInputs) { + val inputCapacity = input.capacity + val fraction = inputCapacity / capacity + val grantedSpeed = rate * fraction + + input.push(grantedSpeed) + } } - override fun close() { - if (!isClosed) { - isClosed = true - aggregator.cancel() + /** + * Recompute the capacity of the switch. + */ + private fun updateCapacity() { + val newCapacity = _activeInputs.sumOf(Input::capacity) + + // No-op if the capacity is unchanged + if (_capacity == newCapacity) { + return } + + _capacity = newCapacity + + for (output in _outputs) { + output.capacity = newCapacity + } + } + + /** + * An internal [SimResourceProvider] implementation for switch outputs. + */ + private inner class Output(capacity: Double, val key: InterferenceKey?) : + SimAbstractResourceProvider(interpreter, capacity), + SimResourceProviderLogic, + Comparable<Output> { + /** + * The requested limit. + */ + @JvmField var limit: Double = 0.0 + + /** + * The actual processing speed. + */ + @JvmField var actualRate: Double = 0.0 + + /** + * The processing speed that is allowed by the model constraints. + */ + val allowedRate: Double + get() = min(capacity, limit) + + /** + * A flag to indicate that the output is closed. + */ + private var _isClosed: Boolean = false + + /** + * The timestamp at which we received the last command. + */ + private var _lastPull: Long = Long.MIN_VALUE + + /** + * Close the output. + * + * This method is invoked when the user removes an output from the switch. + */ + fun close() { + _isClosed = true + cancel() + } + + /* SimAbstractResourceProvider */ + override fun createLogic(): SimResourceProviderLogic = this + + override fun start(ctx: SimResourceControllableContext) { + check(!_isClosed) { "Cannot re-use closed output" } + + _activeOutputs += this + super.start(ctx) + } + + /* SimResourceProviderLogic */ + override fun onConsume( + ctx: SimResourceControllableContext, + now: Long, + delta: Long, + limit: Double, + duration: Long + ) { + doUpdateCounters(delta) + + actualRate = 0.0 + this.limit = limit + _lastPull = now + + runScheduler(now) + } + + override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) { + parent?.onConverge(now) + } + + override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) { + doUpdateCounters(delta) + + limit = 0.0 + actualRate = 0.0 + _lastPull = now + } + + /* Comparable */ + override fun compareTo(other: Output): Int = allowedRate.compareTo(other.allowedRate) + + /** + * Pull the next command if necessary. + */ + fun pull(now: Long) { + val ctx = ctx + if (ctx != null && _lastPull < now) { + ctx.flush() + } + } + + /** + * Helper method to update the resource counters of the distributor. + */ + private fun doUpdateCounters(delta: Long) { + if (delta <= 0L) { + return + } + + // Compute the performance penalty due to resource interference + val perfScore = if (interferenceDomain != null) { + val load = _rate / capacity + interferenceDomain.apply(key, load) + } else { + 1.0 + } + + val deltaS = delta / 1000.0 + val work = limit * deltaS + val actualWork = actualRate * deltaS + val remainingWork = work - actualWork + + updateCounters(work, actualWork, remainingWork) + + val distCounters = _counters + distCounters.demand += work + distCounters.actual += actualWork + distCounters.overcommit += remainingWork + distCounters.interference += actualWork * max(0.0, 1 - perfScore) + } + } + + /** + * An internal [SimResourceConsumer] implementation for switch inputs. + */ + private inner class Input(private val provider: SimResourceProvider) : SimResourceConsumer, Comparable<Input> { + /** + * The active [SimResourceContext] of this consumer. + */ + private var _ctx: SimResourceContext? = null + + /** + * The capacity of this input. + */ + val capacity: Double + get() = _ctx?.capacity ?: 0.0 + + /** + * Push the specified rate to the provider. + */ + fun push(rate: Double) { + _ctx?.push(rate) + } + + /** + * Cancel this input. + */ + fun cancel() { + provider.cancel() + } + + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + runScheduler(now) + return Long.MAX_VALUE + } + + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + assert(_ctx == null) { "Consumer running concurrently" } + _ctx = ctx + updateCapacity() + } + SimResourceEvent.Exit -> { + _ctx = null + updateCapacity() + } + SimResourceEvent.Capacity -> updateCapacity() + else -> {} + } + } + + override fun compareTo(other: Input): Int = capacity.compareTo(other.capacity) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt index 827019c5..01062179 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt @@ -31,12 +31,16 @@ internal class SimResourceCountersImpl : SimResourceCounters { override var demand: Double = 0.0 override var actual: Double = 0.0 override var overcommit: Double = 0.0 + override var interference: Double = 0.0 override fun reset() { demand = 0.0 actual = 0.0 overcommit = 0.0 + interference = 0.0 } - override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + override fun toString(): String { + return "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]" + } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt deleted file mode 100644 index f4ea5fe8..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import io.mockk.spyk -import io.mockk.verify -import kotlinx.coroutines.* -import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl - -/** - * Test suite for the [SimResourceAggregatorMaxMin] class. - */ -internal class SimResourceAggregatorMaxMinTest { - @Test - fun testSingleCapacity() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - - val aggregator = SimResourceAggregatorMaxMin(scheduler) - val forwarder = SimResourceForwarder() - val sources = listOf( - forwarder, - SimResourceSource(1.0, scheduler) - ) - sources.forEach(aggregator::addInput) - - val consumer = SimWorkConsumer(1.0, 0.5) - val usage = mutableListOf<Double>() - val source = SimResourceSource(1.0, scheduler) - val adapter = SimSpeedConsumerAdapter(forwarder, usage::add) - source.startConsumer(adapter) - - aggregator.consume(consumer) - yield() - - assertAll( - { assertEquals(1000, clock.millis()) }, - { assertEquals(listOf(0.0, 0.5, 0.0), usage) } - ) - } - - @Test - fun testDoubleCapacity() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - - val aggregator = SimResourceAggregatorMaxMin(scheduler) - val sources = listOf( - SimResourceSource(1.0, scheduler), - SimResourceSource(1.0, scheduler) - ) - sources.forEach(aggregator::addInput) - - val consumer = SimWorkConsumer(2.0, 1.0) - val usage = mutableListOf<Double>() - val adapter = SimSpeedConsumerAdapter(consumer, usage::add) - - aggregator.consume(adapter) - yield() - assertAll( - { assertEquals(1000, clock.millis()) }, - { assertEquals(listOf(0.0, 2.0, 0.0), usage) } - ) - } - - @Test - fun testOvercommit() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - - val aggregator = SimResourceAggregatorMaxMin(scheduler) - val sources = listOf( - SimResourceSource(1.0, scheduler), - SimResourceSource(1.0, scheduler) - ) - sources.forEach(aggregator::addInput) - - val consumer = spyk(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return if (now == 0L) { - ctx.push(4.0) - 1000 - } else { - ctx.close() - Long.MAX_VALUE - } - } - }) - - aggregator.consume(consumer) - yield() - assertEquals(1000, clock.millis()) - - verify(exactly = 2) { consumer.onNext(any(), any(), any()) } - } - - @Test - fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - - val aggregator = SimResourceAggregatorMaxMin(scheduler) - val sources = listOf( - SimResourceSource(1.0, scheduler), - SimResourceSource(1.0, scheduler) - ) - sources.forEach(aggregator::addInput) - - val consumer = SimWorkConsumer(4.0, 1.0) - coroutineScope { - launch { aggregator.consume(consumer) } - delay(1000) - sources[0].capacity = 0.5 - } - yield() - assertEquals(2333, clock.millis()) - } - - @Test - fun testFailOverCapacity() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - - val aggregator = SimResourceAggregatorMaxMin(scheduler) - val sources = listOf( - SimResourceSource(1.0, scheduler), - SimResourceSource(1.0, scheduler) - ) - sources.forEach(aggregator::addInput) - - val consumer = SimWorkConsumer(1.0, 0.5) - coroutineScope { - launch { aggregator.consume(consumer) } - delay(500) - sources[0].capacity = 0.5 - } - yield() - assertEquals(1167, clock.millis()) - } - - @Test - fun testCounters() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - - val aggregator = SimResourceAggregatorMaxMin(scheduler) - val sources = listOf( - SimResourceSource(1.0, scheduler), - SimResourceSource(1.0, scheduler) - ) - sources.forEach(aggregator::addInput) - - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return if (now == 0L) { - ctx.push(4.0) - 1000 - } else { - ctx.close() - Long.MAX_VALUE - } - } - } - - aggregator.consume(consumer) - yield() - assertEquals(1000, clock.millis()) - assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" } - } -} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index 9f86dc0d..49f2da5f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -65,13 +65,8 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(forwarder) val provider = switch.newOutput() - - try { - provider.consume(workload) - yield() - } finally { - provider.close() - } + provider.consume(workload) + yield() assertAll( { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, @@ -95,13 +90,9 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(source) val provider = switch.newOutput() + provider.consume(workload) + yield() - try { - provider.consume(workload) - yield() - } finally { - provider.close() - } assertEquals(duration, clock.millis()) { "Took enough time" } } @@ -141,14 +132,9 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(source) val provider = switch.newOutput() - - try { - provider.consume(workload) - yield() - provider.consume(workload) - } finally { - provider.close() - } + provider.consume(workload) + yield() + provider.consume(workload) assertEquals(duration * 2, clock.millis()) { "Took enough time" } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index ba0d66ff..03f90e21 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -51,7 +51,7 @@ internal class SimResourceSwitchMaxMinTest { provider.consume(consumer) yield() } finally { - switch.close() + switch.clear() } } @@ -81,7 +81,7 @@ internal class SimResourceSwitchMaxMinTest { provider.consume(workload) yield() } finally { - switch.close() + switch.clear() } assertAll( @@ -133,7 +133,7 @@ internal class SimResourceSwitchMaxMinTest { yield() } finally { - switch.close() + switch.clear() } assertAll( { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, |
