summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt2
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt6
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt147
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt38
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt64
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt37
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt5
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt42
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt332
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt16
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt64
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt361
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt6
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt190
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt28
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt6
20 files changed, 424 insertions, 941 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index fdb3f1dc..a8b8afe9 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -47,7 +47,6 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.compute.power.SimplePowerDriver
-import org.opendc.simulator.resources.SimResourceDistributorMaxMin
import org.opendc.simulator.resources.SimResourceInterpreter
import java.util.*
import kotlin.coroutines.CoroutineContext
@@ -418,7 +417,7 @@ public class SimHost(
val counters = hypervisor.counters
val grantedWork = counters.actual
val overcommittedWork = counters.overcommit
- val interferedWork = (counters as? SimResourceDistributorMaxMin.Counters)?.interference ?: 0.0
+ val interferedWork = counters.interference
_totalTime += (duration / 1000.0) * coreCount
val activeTime = (grantedWork * d).roundToLong()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 16085d82..1bec2de5 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -212,7 +212,7 @@ class CapelinIntegrationTest {
{ assertEquals(6013899, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
{ assertEquals(14724501, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(476163, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
+ { assertEquals(477279, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index 98271fb0..cf9e3230 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -131,7 +131,7 @@ public abstract class SimAbstractHypervisor(
/**
* The vCPUs of the machine.
*/
- override val cpus = model.cpus.map { VCpu(switch.newOutput(interferenceKey), it) }
+ override val cpus = model.cpus.map { VCpu(switch, switch.newOutput(interferenceKey), it) }
override fun close() {
super.close()
@@ -153,9 +153,10 @@ public abstract class SimAbstractHypervisor(
* A [SimProcessingUnit] of a virtual machine.
*/
private class VCpu(
- private val source: SimResourceCloseableProvider,
+ private val switch: SimResourceSwitch,
+ private val source: SimResourceProvider,
override val model: ProcessingUnit
- ) : SimProcessingUnit, SimResourceCloseableProvider by source {
+ ) : SimProcessingUnit, SimResourceProvider by source {
override var capacity: Double
get() = source.capacity
set(_) {
@@ -163,6 +164,13 @@ public abstract class SimAbstractHypervisor(
}
override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]"
+
+ /**
+ * Close the CPU
+ */
+ fun close() {
+ switch.removeOutput(source)
+ }
}
/**
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
index 1f010338..8cd535ad 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
@@ -103,8 +103,10 @@ internal class SimHypervisorTest {
println("Hypervisor finished")
}
yield()
+
val vm = hypervisor.createMachine(model)
vm.run(workloadA)
+
yield()
machine.close()
diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
index 05daaa5c..2267715e 100644
--- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
+++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
@@ -70,7 +70,7 @@ public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimN
override fun close() {
isClosed = true
- _provider.close()
+ switch.removeOutput(_provider)
_ports.remove(this)
}
}
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
index e5fcd938..947f6cb2 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
@@ -49,7 +49,7 @@ public class SimPdu(
/**
* Create a new PDU outlet.
*/
- public fun newOutlet(): Outlet = Outlet(switch.newOutput())
+ public fun newOutlet(): Outlet = Outlet(switch, switch.newOutput())
init {
switch.addInput(forwarder)
@@ -81,7 +81,7 @@ public class SimPdu(
/**
* A PDU outlet.
*/
- public class Outlet(private val provider: SimResourceCloseableProvider) : SimPowerOutlet(), AutoCloseable {
+ public class Outlet(private val switch: SimResourceSwitch, private val provider: SimResourceProvider) : SimPowerOutlet(), AutoCloseable {
override fun onConnect(inlet: SimPowerInlet) {
provider.startConsumer(inlet.createConsumer())
}
@@ -94,7 +94,7 @@ public class SimPdu(
* Remove the outlet from the PDU.
*/
override fun close() {
- provider.close()
+ switch.removeOutput(provider)
}
override fun toString(): String = "SimPdu.Outlet"
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
deleted file mode 100644
index 621ea6e7..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-/**
- * Abstract implementation of [SimResourceAggregator].
- */
-public abstract class SimAbstractResourceAggregator(
- interpreter: SimResourceInterpreter,
- parent: SimResourceSystem?
-) : SimResourceAggregator {
- /**
- * This method is invoked when the resource consumer consumes resources.
- */
- protected abstract fun doConsume(limit: Double)
-
- /**
- * This method is invoked when the resource consumer finishes processing.
- */
- protected abstract fun doFinish()
-
- /**
- * This method is invoked when an input context is started.
- */
- protected abstract fun onInputStarted(input: SimResourceContext)
-
- /**
- * This method is invoked when an input is stopped.
- */
- protected abstract fun onInputFinished(input: SimResourceContext)
-
- /* SimResourceAggregator */
- override fun addInput(input: SimResourceProvider) {
- val consumer = Consumer()
- _inputs.add(input)
- _inputConsumers.add(consumer)
- input.startConsumer(consumer)
- }
-
- override val inputs: Set<SimResourceProvider>
- get() = _inputs
- private val _inputs = mutableSetOf<SimResourceProvider>()
- private val _inputConsumers = mutableListOf<Consumer>()
-
- /* SimResourceProvider */
- override val isActive: Boolean
- get() = _output.isActive
-
- override val capacity: Double
- get() = _output.capacity
-
- override val speed: Double
- get() = _output.speed
-
- override val demand: Double
- get() = _output.demand
-
- override val counters: SimResourceCounters
- get() = _output.counters
-
- override fun startConsumer(consumer: SimResourceConsumer) {
- _output.startConsumer(consumer)
- }
-
- override fun cancel() {
- _output.cancel()
- }
-
- override fun interrupt() {
- _output.interrupt()
- }
-
- private val _output = object : SimAbstractResourceProvider(interpreter, initialCapacity = 0.0) {
- override fun createLogic(): SimResourceProviderLogic {
- return object : SimResourceProviderLogic {
-
- override fun onConsume(ctx: SimResourceControllableContext, now: Long, delta: Long, limit: Double, duration: Long) {
- updateCounters(ctx, delta)
- doConsume(limit)
- }
-
- override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {
- updateCounters(ctx, delta)
- doFinish()
- }
-
- override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {
- parent?.onConverge(now)
- }
- }
- }
- }
-
- /**
- * An internal [SimResourceConsumer] implementation for aggregator inputs.
- */
- private inner class Consumer : SimResourceConsumer {
- /**
- * The resource context associated with the input.
- */
- private var _ctx: SimResourceContext? = null
-
- private fun updateCapacity() {
- // Adjust capacity of output resource
- _output.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 }
- }
-
- /* SimResourceConsumer */
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- return Long.MAX_VALUE
- }
-
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- when (event) {
- SimResourceEvent.Start -> {
- _ctx = ctx
- updateCapacity()
-
- onInputStarted(ctx)
- }
- SimResourceEvent.Capacity -> updateCapacity()
- SimResourceEvent.Exit -> onInputFinished(ctx)
- else -> {}
- }
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
deleted file mode 100644
index 00972f43..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-/**
- * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource.
- */
-public interface SimResourceAggregator : SimResourceProvider {
- /**
- * The input resources that will be switched between the output providers.
- */
- public val inputs: Set<SimResourceProvider>
-
- /**
- * Add the specified [input] to the aggregator.
- */
- public fun addInput(input: SimResourceProvider)
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
deleted file mode 100644
index f131ac6c..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-/**
- * A [SimResourceAggregator] that distributes the load equally across the input resources.
- */
-public class SimResourceAggregatorMaxMin(
- interpreter: SimResourceInterpreter,
- parent: SimResourceSystem? = null
-) : SimAbstractResourceAggregator(interpreter, parent) {
- private val consumers = mutableListOf<SimResourceContext>()
-
- override fun doConsume(limit: Double) {
- // Sort all consumers by their capacity
- consumers.sortWith(compareBy { it.capacity })
-
- // Divide the requests over the available capacity of the input resources fairly
- for (input in consumers) {
- val inputCapacity = input.capacity
- val fraction = inputCapacity / capacity
- val grantedSpeed = limit * fraction
-
- input.push(grantedSpeed)
- input.interrupt()
- }
- }
-
- override fun doFinish() {
- val iterator = consumers.iterator()
- for (input in iterator) {
- iterator.remove()
- input.close()
- }
- }
-
- override fun onInputStarted(input: SimResourceContext) {
- consumers.add(input)
- }
-
- override fun onInputFinished(input: SimResourceContext) {
- consumers.remove(input)
- }
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt
deleted file mode 100644
index bce8274b..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-/**
- * A [SimResourceProvider] that has a controllable and limited lifetime.
- *
- * This interface is used to signal that the resource provider may be closed and not reused after that point.
- */
-public interface SimResourceCloseableProvider : SimResourceProvider, AutoCloseable {
- /**
- * End the lifetime of the resource provider.
- *
- * This operation cancels the existing resource consumer and prevents the resource provider from being reused.
- */
- public override fun close()
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt
index 725aa5bc..11924db2 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt
@@ -42,6 +42,11 @@ public interface SimResourceCounters {
public val overcommit: Double
/**
+ * The amount of work lost due to interference.
+ */
+ public val interference: Double
+
+ /**
* Reset the resource counters.
*/
public fun reset()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
deleted file mode 100644
index f384582f..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import org.opendc.simulator.resources.interference.InterferenceKey
-
-/**
- * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers.
- */
-public interface SimResourceDistributor : SimResourceConsumer {
- /**
- * The output resource providers to which resource consumers can be attached.
- */
- public val outputs: Set<SimResourceCloseableProvider>
-
- /**
- * Create a new output for the distributor.
- *
- * @param key The key of the interference member to which the output belongs.
- */
- public fun newOutput(key: InterferenceKey? = null): SimResourceCloseableProvider
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
deleted file mode 100644
index 7df940ad..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import org.opendc.simulator.resources.interference.InterferenceDomain
-import org.opendc.simulator.resources.interference.InterferenceKey
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing.
- *
- * @param interpreter The interpreter for managing the resource contexts.
- * @param parent The parent resource system of the distributor.
- * @param interferenceDomain The interference domain of the distributor.
- */
-public class SimResourceDistributorMaxMin(
- private val interpreter: SimResourceInterpreter,
- private val parent: SimResourceSystem? = null,
- private val interferenceDomain: InterferenceDomain? = null
-) : SimResourceDistributor {
- override val outputs: Set<SimResourceCloseableProvider>
- get() = _outputs
- private val _outputs = mutableSetOf<Output>()
-
- /**
- * The resource context of the consumer.
- */
- private var ctx: SimResourceContext? = null
-
- /**
- * The active outputs.
- */
- private val activeOutputs: MutableList<Output> = mutableListOf()
-
- /**
- * The total allocated speed for the output resources.
- */
- private var totalAllocatedSpeed = 0.0
-
- /**
- * The total requested speed for the output resources.
- */
- private var totalRequestedSpeed = 0.0
-
- /**
- * The resource counters of this distributor.
- */
- public val counters: Counters
- get() = _counters
- private val _counters = object : Counters {
- override var demand: Double = 0.0
- override var actual: Double = 0.0
- override var overcommit: Double = 0.0
- override var interference: Double = 0.0
-
- override fun reset() {
- demand = 0.0
- actual = 0.0
- overcommit = 0.0
- interference = 0.0
- }
-
- override fun toString(): String = "SimResourceDistributorMaxMin.Counters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]"
- }
-
- /* SimResourceDistributor */
- override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider {
- val provider = Output(ctx?.capacity ?: 0.0, key)
- _outputs.add(provider)
- return provider
- }
-
- /* SimResourceConsumer */
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- // If there is no work yet, mark the input as idle.
- if (activeOutputs.isEmpty()) {
- return Long.MAX_VALUE
- }
-
- val capacity = ctx.capacity
- var duration: Long = Long.MAX_VALUE
- var availableSpeed = capacity
- var totalRequestedSpeed = 0.0
-
- // Pull in the work of the outputs
- val outputIterator = activeOutputs.listIterator()
- for (output in outputIterator) {
- output.pull(now)
-
- // Remove outputs that have finished
- if (!output.isActive) {
- outputIterator.remove()
- }
- }
-
- // Sort in-place the outputs based on their requested usage.
- // Profiling shows that it is faster than maintaining some kind of sorted set.
- activeOutputs.sort()
-
- // Divide the available input capacity fairly across the outputs using max-min fair sharing
- var remaining = activeOutputs.size
- for (output in activeOutputs) {
- val availableShare = availableSpeed / remaining--
- val grantedSpeed = min(output.allowedSpeed, availableShare)
-
- duration = min(duration, output.duration)
-
- // Ignore idle computation
- if (grantedSpeed <= 0.0) {
- output.actualSpeed = 0.0
- continue
- }
-
- totalRequestedSpeed += output.limit
-
- output.actualSpeed = grantedSpeed
- availableSpeed -= grantedSpeed
- }
-
- val durationS = duration / 1000.0
- var totalRequestedWork = 0.0
- var totalAllocatedWork = 0.0
- for (output in activeOutputs) {
- val limit = output.limit
- val speed = output.actualSpeed
- if (speed > 0.0) {
- totalRequestedWork += limit * durationS
- totalAllocatedWork += speed * durationS
- }
- }
-
- this.totalRequestedSpeed = totalRequestedSpeed
- val totalAllocatedSpeed = capacity - availableSpeed
- this.totalAllocatedSpeed = totalAllocatedSpeed
-
- ctx.push(totalAllocatedSpeed)
- return duration
- }
-
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- when (event) {
- SimResourceEvent.Start -> {
- this.ctx = ctx
- updateCapacity(ctx)
- }
- SimResourceEvent.Exit -> {
- val iterator = _outputs.iterator()
- while (iterator.hasNext()) {
- val output = iterator.next()
-
- // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
- // during the call to output.close()
- iterator.remove()
-
- output.close()
- }
- }
- SimResourceEvent.Capacity -> updateCapacity(ctx)
- else -> {}
- }
- }
-
- /**
- * Extended [SimResourceCounters] interface for the distributor.
- */
- public interface Counters : SimResourceCounters {
- /**
- * The amount of work lost due to interference.
- */
- public val interference: Double
- }
-
- private fun updateCapacity(ctx: SimResourceContext) {
- for (output in _outputs) {
- output.capacity = ctx.capacity
- }
- }
-
- /**
- * An internal [SimResourceProvider] implementation for switch outputs.
- */
- private inner class Output(capacity: Double, val key: InterferenceKey?) :
- SimAbstractResourceProvider(interpreter, capacity),
- SimResourceCloseableProvider,
- SimResourceProviderLogic,
- Comparable<Output> {
- /**
- * A flag to indicate that the output is closed.
- */
- private var isClosed: Boolean = false
-
- /**
- * The requested limit.
- */
- @JvmField var limit: Double = 0.0
-
- /**
- * The current deadline.
- */
- @JvmField var duration: Long = Long.MAX_VALUE
-
- /**
- * The processing speed that is allowed by the model constraints.
- */
- @JvmField var allowedSpeed: Double = 0.0
-
- /**
- * The actual processing speed.
- */
- @JvmField var actualSpeed: Double = 0.0
-
- /**
- * The timestamp at which we received the last command.
- */
- private var lastCommandTimestamp: Long = Long.MIN_VALUE
-
- /* SimAbstractResourceProvider */
- override fun createLogic(): SimResourceProviderLogic = this
-
- override fun start(ctx: SimResourceControllableContext) {
- check(!isClosed) { "Cannot re-use closed output" }
-
- activeOutputs += this
- interpreter.batch {
- ctx.start()
- // Interrupt the input to re-schedule the resources
- this@SimResourceDistributorMaxMin.ctx?.interrupt()
- }
- }
-
- override fun close() {
- isClosed = true
- cancel()
- _outputs.remove(this)
- }
-
- /* SimResourceProviderLogic */
- override fun onConsume(
- ctx: SimResourceControllableContext,
- now: Long,
- delta: Long,
- limit: Double,
- duration: Long
- ) {
- doUpdateCounters(delta)
-
- allowedSpeed = min(ctx.capacity, limit)
- actualSpeed = 0.0
- this.limit = limit
- this.duration = duration
- lastCommandTimestamp = now
- }
-
- override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {
- parent?.onConverge(now)
- }
-
- override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {
- doUpdateCounters(delta)
-
- limit = 0.0
- duration = Long.MAX_VALUE
- actualSpeed = 0.0
- allowedSpeed = 0.0
- lastCommandTimestamp = now
- }
-
- /* Comparable */
- override fun compareTo(other: Output): Int = allowedSpeed.compareTo(other.allowedSpeed)
-
- /**
- * Pull the next command if necessary.
- */
- fun pull(now: Long) {
- val ctx = ctx
- if (ctx != null && lastCommandTimestamp < now) {
- ctx.flush()
- }
- }
-
- /**
- * Helper method to update the resource counters of the distributor.
- */
- private fun doUpdateCounters(delta: Long) {
- if (delta <= 0L) {
- return
- }
-
- // Compute the performance penalty due to resource interference
- val perfScore = if (interferenceDomain != null) {
- val load = totalAllocatedSpeed / requireNotNull(this@SimResourceDistributorMaxMin.ctx).capacity
- interferenceDomain.apply(key, load)
- } else {
- 1.0
- }
-
- val deltaS = delta / 1000.0
- val work = limit * deltaS
- val actualWork = actualSpeed * deltaS
- val remainingWork = work - actualWork
-
- updateCounters(work, actualWork, remainingWork)
-
- val distCounters = _counters
- distCounters.demand += work
- distCounters.actual += actualWork
- distCounters.overcommit += remainingWork
- distCounters.interference += actualWork * max(0.0, 1 - perfScore)
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
index d2aab634..3c25b76d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
@@ -27,11 +27,11 @@ import org.opendc.simulator.resources.interference.InterferenceKey
/**
* A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers.
*/
-public interface SimResourceSwitch : AutoCloseable {
+public interface SimResourceSwitch {
/**
* The output resource providers to which resource consumers can be attached.
*/
- public val outputs: Set<SimResourceCloseableProvider>
+ public val outputs: Set<SimResourceProvider>
/**
* The input resources that will be switched between the output providers.
@@ -48,10 +48,20 @@ public interface SimResourceSwitch : AutoCloseable {
*
* @param key The key of the interference member to which the output belongs.
*/
- public fun newOutput(key: InterferenceKey? = null): SimResourceCloseableProvider
+ public fun newOutput(key: InterferenceKey? = null): SimResourceProvider
+
+ /**
+ * Remove [output] from this switch.
+ */
+ public fun removeOutput(output: SimResourceProvider)
/**
* Add the specified [input] to the switch.
*/
public fun addInput(input: SimResourceProvider)
+
+ /**
+ * Clear all inputs and outputs from the switch.
+ */
+ public fun clear()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
index fbb541e5..2be8ccb0 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -27,23 +27,17 @@ import java.util.ArrayDeque
/**
* A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that
- * a single output is directly connected to an input and that the switch can only support as much outputs as inputs.
+ * a single output is directly connected to an input and that the switch can only support as many outputs as inputs.
*/
public class SimResourceSwitchExclusive : SimResourceSwitch {
- /**
- * A flag to indicate that the switch is closed.
- */
- private var isClosed: Boolean = false
-
- private val _outputs = mutableSetOf<Provider>()
- override val outputs: Set<SimResourceCloseableProvider>
+ override val outputs: Set<SimResourceProvider>
get() = _outputs
-
- private val availableResources = ArrayDeque<SimResourceTransformer>()
+ private val _outputs = mutableSetOf<Output>()
private val _inputs = mutableSetOf<SimResourceProvider>()
override val inputs: Set<SimResourceProvider>
get() = _inputs
+ private val _availableInputs = ArrayDeque<SimResourceTransformer>()
override val counters: SimResourceCounters = object : SimResourceCounters {
override val demand: Double
@@ -52,6 +46,8 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
get() = _inputs.sumOf { it.counters.actual }
override val overcommit: Double
get() = _inputs.sumOf { it.counters.overcommit }
+ override val interference: Double
+ get() = _inputs.sumOf { it.counters.interference }
override fun reset() {
for (input in _inputs) {
@@ -65,18 +61,25 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
/**
* Add an output to the switch.
*/
- override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider {
- check(!isClosed) { "Switch has been closed" }
- check(availableResources.isNotEmpty()) { "No capacity to serve request" }
- val forwarder = availableResources.poll()
- val output = Provider(forwarder)
+ override fun newOutput(key: InterferenceKey?): SimResourceProvider {
+ val forwarder = checkNotNull(_availableInputs.poll()) { "No capacity to serve request" }
+ val output = Output(forwarder)
_outputs += output
return output
}
- override fun addInput(input: SimResourceProvider) {
- check(!isClosed) { "Switch has been closed" }
+ override fun removeOutput(output: SimResourceProvider) {
+ if (!_outputs.remove(output)) {
+ return
+ }
+
+ (output as Output).close()
+ }
+ /**
+ * Add an input to the switch.
+ */
+ override fun addInput(input: SimResourceProvider) {
if (input in inputs) {
return
}
@@ -84,7 +87,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
val forwarder = SimResourceForwarder()
_inputs += input
- availableResources += forwarder
+ _availableInputs += forwarder
input.startConsumer(object : SimResourceConsumer by forwarder {
override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
@@ -98,18 +101,29 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
})
}
- override fun close() {
- isClosed = true
+ override fun clear() {
+ for (input in _inputs) {
+ input.cancel()
+ }
+ _inputs.clear()
- // Cancel all upstream subscriptions
- _inputs.forEach(SimResourceProvider::cancel)
+ // Outputs are implicitly cancelled by the inputs forwarders
+ _outputs.clear()
}
- private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceCloseableProvider, SimResourceProvider by forwarder {
- override fun close() {
+ /**
+ * An output of the resource switch.
+ */
+ private inner class Output(private val forwarder: SimResourceTransformer) : SimResourceProvider by forwarder {
+ /**
+ * Close the output.
+ */
+ fun close() {
// We explicitly do not close the forwarder here in order to re-use it across output resources.
_outputs -= this
- availableResources += forwarder
+ _availableInputs += forwarder
}
+
+ override fun toString(): String = "SimResourceSwitchExclusive.Output"
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index e368609f..574fb443 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -22,8 +22,11 @@
package org.opendc.simulator.resources
+import org.opendc.simulator.resources.impl.SimResourceCountersImpl
import org.opendc.simulator.resources.interference.InterferenceDomain
import org.opendc.simulator.resources.interference.InterferenceKey
+import kotlin.math.max
+import kotlin.math.min
/**
* A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
@@ -34,69 +37,371 @@ import org.opendc.simulator.resources.interference.InterferenceKey
* @param interferenceDomain The interference domain of the switch.
*/
public class SimResourceSwitchMaxMin(
- interpreter: SimResourceInterpreter,
- parent: SimResourceSystem? = null,
- interferenceDomain: InterferenceDomain? = null
+ private val interpreter: SimResourceInterpreter,
+ private val parent: SimResourceSystem? = null,
+ private val interferenceDomain: InterferenceDomain? = null
) : SimResourceSwitch {
/**
* The output resource providers to which resource consumers can be attached.
*/
- override val outputs: Set<SimResourceCloseableProvider>
- get() = distributor.outputs
+ override val outputs: Set<SimResourceProvider>
+ get() = _outputs
+ private val _outputs = mutableSetOf<Output>()
+ private val _activeOutputs: MutableList<Output> = mutableListOf()
/**
* The input resources that will be switched between the output providers.
*/
override val inputs: Set<SimResourceProvider>
- get() = aggregator.inputs
+ get() = _inputs
+ private val _inputs = mutableSetOf<SimResourceProvider>()
+ private val _activeInputs = mutableListOf<Input>()
/**
- * The resource counters to track the execution metrics of all switch resources.
+ * The resource counters of this switch.
*/
- override val counters: SimResourceDistributorMaxMin.Counters
- get() = distributor.counters
+ public override val counters: SimResourceCounters
+ get() = _counters
+ private val _counters = SimResourceCountersImpl()
/**
- * A flag to indicate that the switch was closed.
+ * The actual processing rate of the switch.
*/
- private var isClosed = false
+ private var _rate = 0.0
/**
- * The aggregator to aggregate the resources.
+ * The demanded processing rate of the outputs.
*/
- private val aggregator = SimResourceAggregatorMaxMin(interpreter, parent)
+ private var _demand = 0.0
/**
- * The distributor to distribute the aggregated resources.
+ * The capacity of the switch.
*/
- private val distributor = SimResourceDistributorMaxMin(interpreter, parent, interferenceDomain)
+ private var _capacity = 0.0
- init {
- aggregator.startConsumer(distributor)
- }
+ /**
+ * Flag to indicate that the scheduler is active.
+ */
+ private var _schedulerActive = false
/**
* Add an output to the switch.
*/
- override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider {
- check(!isClosed) { "Switch has been closed" }
-
- return distributor.newOutput(key)
+ override fun newOutput(key: InterferenceKey?): SimResourceProvider {
+ val provider = Output(_capacity, key)
+ _outputs.add(provider)
+ return provider
}
/**
* Add the specified [input] to the switch.
*/
override fun addInput(input: SimResourceProvider) {
- check(!isClosed) { "Switch has been closed" }
+ val consumer = Input(input)
+ if (_inputs.add(input)) {
+ _activeInputs.add(consumer)
+ input.startConsumer(consumer)
+ }
+ }
+
+ /**
+ * Remove [output] from this switch.
+ */
+ override fun removeOutput(output: SimResourceProvider) {
+ if (!_outputs.remove(output)) {
+ return
+ }
+ // This cast should always succeed since only `Output` instances should be added to _outputs
+ (output as Output).close()
+ }
+
+ override fun clear() {
+ for (input in _activeInputs) {
+ input.cancel()
+ }
+ _activeInputs.clear()
+
+ for (output in _activeOutputs) {
+ output.cancel()
+ }
+ _activeOutputs.clear()
+ }
+
+ /**
+ * Run the scheduler of the switch.
+ */
+ private fun runScheduler(now: Long) {
+ if (_schedulerActive) {
+ return
+ }
+
+ _schedulerActive = true
+ try {
+ doSchedule(now)
+ } finally {
+ _schedulerActive = false
+ }
+ }
+
+ /**
+ * Schedule the outputs over the input.
+ */
+ private fun doSchedule(now: Long) {
+ // If there is no work yet, mark the input as idle.
+ if (_activeOutputs.isEmpty()) {
+ return
+ }
+
+ val capacity = _capacity
+ var availableCapacity = capacity
+
+ // Pull in the work of the outputs
+ val outputIterator = _activeOutputs.listIterator()
+ for (output in outputIterator) {
+ output.pull(now)
+
+ // Remove outputs that have finished
+ if (!output.isActive) {
+ outputIterator.remove()
+ }
+ }
+
+ var demand = 0.0
+
+ // Sort in-place the outputs based on their requested usage.
+ // Profiling shows that it is faster than maintaining some kind of sorted set.
+ _activeOutputs.sort()
+
+ // Divide the available input capacity fairly across the outputs using max-min fair sharing
+ var remaining = _activeOutputs.size
+ for (output in _activeOutputs) {
+ val availableShare = availableCapacity / remaining--
+ val grantedSpeed = min(output.allowedRate, availableShare)
+
+ // Ignore idle computation
+ if (grantedSpeed <= 0.0) {
+ output.actualRate = 0.0
+ continue
+ }
+
+ demand += output.limit
- aggregator.addInput(input)
+ output.actualRate = grantedSpeed
+ availableCapacity -= grantedSpeed
+ }
+
+ val rate = capacity - availableCapacity
+
+ _demand = demand
+ _rate = rate
+
+ // Sort all consumers by their capacity
+ _activeInputs.sort()
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (input in _activeInputs) {
+ val inputCapacity = input.capacity
+ val fraction = inputCapacity / capacity
+ val grantedSpeed = rate * fraction
+
+ input.push(grantedSpeed)
+ }
}
- override fun close() {
- if (!isClosed) {
- isClosed = true
- aggregator.cancel()
+ /**
+ * Recompute the capacity of the switch.
+ */
+ private fun updateCapacity() {
+ val newCapacity = _activeInputs.sumOf(Input::capacity)
+
+ // No-op if the capacity is unchanged
+ if (_capacity == newCapacity) {
+ return
}
+
+ _capacity = newCapacity
+
+ for (output in _outputs) {
+ output.capacity = newCapacity
+ }
+ }
+
+ /**
+ * An internal [SimResourceProvider] implementation for switch outputs.
+ */
+ private inner class Output(capacity: Double, val key: InterferenceKey?) :
+ SimAbstractResourceProvider(interpreter, capacity),
+ SimResourceProviderLogic,
+ Comparable<Output> {
+ /**
+ * The requested limit.
+ */
+ @JvmField var limit: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ @JvmField var actualRate: Double = 0.0
+
+ /**
+ * The processing speed that is allowed by the model constraints.
+ */
+ val allowedRate: Double
+ get() = min(capacity, limit)
+
+ /**
+ * A flag to indicate that the output is closed.
+ */
+ private var _isClosed: Boolean = false
+
+ /**
+ * The timestamp at which we received the last command.
+ */
+ private var _lastPull: Long = Long.MIN_VALUE
+
+ /**
+ * Close the output.
+ *
+ * This method is invoked when the user removes an output from the switch.
+ */
+ fun close() {
+ _isClosed = true
+ cancel()
+ }
+
+ /* SimAbstractResourceProvider */
+ override fun createLogic(): SimResourceProviderLogic = this
+
+ override fun start(ctx: SimResourceControllableContext) {
+ check(!_isClosed) { "Cannot re-use closed output" }
+
+ _activeOutputs += this
+ super.start(ctx)
+ }
+
+ /* SimResourceProviderLogic */
+ override fun onConsume(
+ ctx: SimResourceControllableContext,
+ now: Long,
+ delta: Long,
+ limit: Double,
+ duration: Long
+ ) {
+ doUpdateCounters(delta)
+
+ actualRate = 0.0
+ this.limit = limit
+ _lastPull = now
+
+ runScheduler(now)
+ }
+
+ override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {
+ parent?.onConverge(now)
+ }
+
+ override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {
+ doUpdateCounters(delta)
+
+ limit = 0.0
+ actualRate = 0.0
+ _lastPull = now
+ }
+
+ /* Comparable */
+ override fun compareTo(other: Output): Int = allowedRate.compareTo(other.allowedRate)
+
+ /**
+ * Pull the next command if necessary.
+ */
+ fun pull(now: Long) {
+ val ctx = ctx
+ if (ctx != null && _lastPull < now) {
+ ctx.flush()
+ }
+ }
+
+ /**
+ * Helper method to update the resource counters of the distributor.
+ */
+ private fun doUpdateCounters(delta: Long) {
+ if (delta <= 0L) {
+ return
+ }
+
+ // Compute the performance penalty due to resource interference
+ val perfScore = if (interferenceDomain != null) {
+ val load = _rate / capacity
+ interferenceDomain.apply(key, load)
+ } else {
+ 1.0
+ }
+
+ val deltaS = delta / 1000.0
+ val work = limit * deltaS
+ val actualWork = actualRate * deltaS
+ val remainingWork = work - actualWork
+
+ updateCounters(work, actualWork, remainingWork)
+
+ val distCounters = _counters
+ distCounters.demand += work
+ distCounters.actual += actualWork
+ distCounters.overcommit += remainingWork
+ distCounters.interference += actualWork * max(0.0, 1 - perfScore)
+ }
+ }
+
+ /**
+ * An internal [SimResourceConsumer] implementation for switch inputs.
+ */
+ private inner class Input(private val provider: SimResourceProvider) : SimResourceConsumer, Comparable<Input> {
+ /**
+ * The active [SimResourceContext] of this consumer.
+ */
+ private var _ctx: SimResourceContext? = null
+
+ /**
+ * The capacity of this input.
+ */
+ val capacity: Double
+ get() = _ctx?.capacity ?: 0.0
+
+ /**
+ * Push the specified rate to the provider.
+ */
+ fun push(rate: Double) {
+ _ctx?.push(rate)
+ }
+
+ /**
+ * Cancel this input.
+ */
+ fun cancel() {
+ provider.cancel()
+ }
+
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ runScheduler(now)
+ return Long.MAX_VALUE
+ }
+
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> {
+ assert(_ctx == null) { "Consumer running concurrently" }
+ _ctx = ctx
+ updateCapacity()
+ }
+ SimResourceEvent.Exit -> {
+ _ctx = null
+ updateCapacity()
+ }
+ SimResourceEvent.Capacity -> updateCapacity()
+ else -> {}
+ }
+ }
+
+ override fun compareTo(other: Input): Int = capacity.compareTo(other.capacity)
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt
index 827019c5..01062179 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt
@@ -31,12 +31,16 @@ internal class SimResourceCountersImpl : SimResourceCounters {
override var demand: Double = 0.0
override var actual: Double = 0.0
override var overcommit: Double = 0.0
+ override var interference: Double = 0.0
override fun reset() {
demand = 0.0
actual = 0.0
overcommit = 0.0
+ interference = 0.0
}
- override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ override fun toString(): String {
+ return "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]"
+ }
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
deleted file mode 100644
index f4ea5fe8..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import io.mockk.spyk
-import io.mockk.verify
-import kotlinx.coroutines.*
-import org.junit.jupiter.api.Assertions.*
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
-import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
-
-/**
- * Test suite for the [SimResourceAggregatorMaxMin] class.
- */
-internal class SimResourceAggregatorMaxMinTest {
- @Test
- fun testSingleCapacity() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
-
- val aggregator = SimResourceAggregatorMaxMin(scheduler)
- val forwarder = SimResourceForwarder()
- val sources = listOf(
- forwarder,
- SimResourceSource(1.0, scheduler)
- )
- sources.forEach(aggregator::addInput)
-
- val consumer = SimWorkConsumer(1.0, 0.5)
- val usage = mutableListOf<Double>()
- val source = SimResourceSource(1.0, scheduler)
- val adapter = SimSpeedConsumerAdapter(forwarder, usage::add)
- source.startConsumer(adapter)
-
- aggregator.consume(consumer)
- yield()
-
- assertAll(
- { assertEquals(1000, clock.millis()) },
- { assertEquals(listOf(0.0, 0.5, 0.0), usage) }
- )
- }
-
- @Test
- fun testDoubleCapacity() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
-
- val aggregator = SimResourceAggregatorMaxMin(scheduler)
- val sources = listOf(
- SimResourceSource(1.0, scheduler),
- SimResourceSource(1.0, scheduler)
- )
- sources.forEach(aggregator::addInput)
-
- val consumer = SimWorkConsumer(2.0, 1.0)
- val usage = mutableListOf<Double>()
- val adapter = SimSpeedConsumerAdapter(consumer, usage::add)
-
- aggregator.consume(adapter)
- yield()
- assertAll(
- { assertEquals(1000, clock.millis()) },
- { assertEquals(listOf(0.0, 2.0, 0.0), usage) }
- )
- }
-
- @Test
- fun testOvercommit() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
-
- val aggregator = SimResourceAggregatorMaxMin(scheduler)
- val sources = listOf(
- SimResourceSource(1.0, scheduler),
- SimResourceSource(1.0, scheduler)
- )
- sources.forEach(aggregator::addInput)
-
- val consumer = spyk(object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- return if (now == 0L) {
- ctx.push(4.0)
- 1000
- } else {
- ctx.close()
- Long.MAX_VALUE
- }
- }
- })
-
- aggregator.consume(consumer)
- yield()
- assertEquals(1000, clock.millis())
-
- verify(exactly = 2) { consumer.onNext(any(), any(), any()) }
- }
-
- @Test
- fun testAdjustCapacity() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
-
- val aggregator = SimResourceAggregatorMaxMin(scheduler)
- val sources = listOf(
- SimResourceSource(1.0, scheduler),
- SimResourceSource(1.0, scheduler)
- )
- sources.forEach(aggregator::addInput)
-
- val consumer = SimWorkConsumer(4.0, 1.0)
- coroutineScope {
- launch { aggregator.consume(consumer) }
- delay(1000)
- sources[0].capacity = 0.5
- }
- yield()
- assertEquals(2333, clock.millis())
- }
-
- @Test
- fun testFailOverCapacity() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
-
- val aggregator = SimResourceAggregatorMaxMin(scheduler)
- val sources = listOf(
- SimResourceSource(1.0, scheduler),
- SimResourceSource(1.0, scheduler)
- )
- sources.forEach(aggregator::addInput)
-
- val consumer = SimWorkConsumer(1.0, 0.5)
- coroutineScope {
- launch { aggregator.consume(consumer) }
- delay(500)
- sources[0].capacity = 0.5
- }
- yield()
- assertEquals(1167, clock.millis())
- }
-
- @Test
- fun testCounters() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
-
- val aggregator = SimResourceAggregatorMaxMin(scheduler)
- val sources = listOf(
- SimResourceSource(1.0, scheduler),
- SimResourceSource(1.0, scheduler)
- )
- sources.forEach(aggregator::addInput)
-
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- return if (now == 0L) {
- ctx.push(4.0)
- 1000
- } else {
- ctx.close()
- Long.MAX_VALUE
- }
- }
- }
-
- aggregator.consume(consumer)
- yield()
- assertEquals(1000, clock.millis())
- assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index 9f86dc0d..49f2da5f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -65,13 +65,8 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(forwarder)
val provider = switch.newOutput()
-
- try {
- provider.consume(workload)
- yield()
- } finally {
- provider.close()
- }
+ provider.consume(workload)
+ yield()
assertAll(
{ assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } },
@@ -95,13 +90,9 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(source)
val provider = switch.newOutput()
+ provider.consume(workload)
+ yield()
- try {
- provider.consume(workload)
- yield()
- } finally {
- provider.close()
- }
assertEquals(duration, clock.millis()) { "Took enough time" }
}
@@ -141,14 +132,9 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(source)
val provider = switch.newOutput()
-
- try {
- provider.consume(workload)
- yield()
- provider.consume(workload)
- } finally {
- provider.close()
- }
+ provider.consume(workload)
+ yield()
+ provider.consume(workload)
assertEquals(duration * 2, clock.millis()) { "Took enough time" }
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
index ba0d66ff..03f90e21 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -51,7 +51,7 @@ internal class SimResourceSwitchMaxMinTest {
provider.consume(consumer)
yield()
} finally {
- switch.close()
+ switch.clear()
}
}
@@ -81,7 +81,7 @@ internal class SimResourceSwitchMaxMinTest {
provider.consume(workload)
yield()
} finally {
- switch.close()
+ switch.clear()
}
assertAll(
@@ -133,7 +133,7 @@ internal class SimResourceSwitchMaxMinTest {
yield()
} finally {
- switch.close()
+ switch.clear()
}
assertAll(
{ assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },