summaryrefslogtreecommitdiff
path: root/simulator/opendc-simulator/opendc-simulator-resources/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-22 16:45:13 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-22 18:16:40 +0100
commit3718c385f84b463ac799080bb5603e0011adcd7d (patch)
tree414e4c9fa82ade602cfdae4384f39b0bdb6cb139 /simulator/opendc-simulator/opendc-simulator-resources/src/main
parentf616b720406250b1415593ff04c9d910b1fda54c (diff)
simulator: Remove generic resource constraint from resource model
This change removes the generic resource constraint (e.g., SimResource) and replaces it by a simple capacity property. In the future, users should handle the resource properties on a higher level. This change simplifies compositions of consumers and providers by not requiring a translation from resource to capacity.
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-resources/src/main')
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt198
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt28
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt48
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt63
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt8
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt7
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt (renamed from simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt)20
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt423
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt17
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt19
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt16
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt14
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt30
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt453
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt7
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt9
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt11
17 files changed, 865 insertions, 506 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
new file mode 100644
index 00000000..18ac0cd8
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -0,0 +1,198 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * Abstract implementation of [SimResourceAggregator].
+ */
+public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator {
+ /**
+ * The available resource provider contexts.
+ */
+ protected val inputContexts: Set<SimResourceContext>
+ get() = _inputContexts
+ private val _inputContexts = mutableSetOf<SimResourceContext>()
+
+ /**
+ * The output context.
+ */
+ protected val outputContext: SimResourceContext
+ get() = context
+
+ /**
+ * The commands to submit to the underlying input resources.
+ */
+ protected val commands: MutableMap<SimResourceContext, SimResourceCommand> = mutableMapOf()
+
+ /**
+ * This method is invoked when the resource consumer consumes resources.
+ */
+ protected abstract fun doConsume(work: Double, limit: Double, deadline: Long)
+
+ /**
+ * This method is invoked when the resource consumer enters an idle state.
+ */
+ protected open fun doIdle(deadline: Long) {
+ for (input in inputContexts) {
+ commands[input] = SimResourceCommand.Idle(deadline)
+ }
+ }
+
+ /**
+ * This method is invoked when the resource consumer finishes processing.
+ */
+ protected open fun doFinish(cause: Throwable?) {
+ for (input in inputContexts) {
+ commands[input] = SimResourceCommand.Exit
+ }
+ }
+
+ /**
+ * This method is invoked when an input context is started.
+ */
+ protected open fun onContextStarted(ctx: SimResourceContext) {
+ _inputContexts.add(ctx)
+ }
+
+ protected open fun onContextFinished(ctx: SimResourceContext) {
+ assert(_inputContexts.remove(ctx)) { "Lost context" }
+ }
+
+ override fun addInput(input: SimResourceProvider) {
+ check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" }
+
+ val consumer = Consumer()
+ _inputs.add(input)
+ input.startConsumer(consumer)
+ }
+
+ override fun close() {
+ output.close()
+ }
+
+ override val output: SimResourceProvider
+ get() = _output
+ private val _output = SimResourceForwarder()
+
+ override val inputs: Set<SimResourceProvider>
+ get() = _inputs
+ private val _inputs = mutableSetOf<SimResourceProvider>()
+
+ private val context = object : SimAbstractResourceContext(clock, _output) {
+ override val capacity: Double
+ get() = inputContexts.sumByDouble { it.capacity }
+
+ override val remainingWork: Double
+ get() = inputContexts.sumByDouble { it.remainingWork }
+
+ override fun interrupt() {
+ super.interrupt()
+
+ interruptAll()
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ doConsume(work, limit, deadline)
+ }
+
+ override fun onIdle(deadline: Long) {
+ doIdle(deadline)
+ }
+
+ override fun onFinish(cause: Throwable?) {
+ doFinish(cause)
+
+ super.onFinish(cause)
+
+ interruptAll()
+ }
+ }
+
+ /**
+ * A flag to indicate that an interrupt is active.
+ */
+ private var isInterrupting: Boolean = false
+
+ /**
+ * Schedule the work over the input resources.
+ */
+ private fun doSchedule() {
+ context.flush(isIntermediate = true)
+ interruptAll()
+ }
+
+ /**
+ * Interrupt all inputs.
+ */
+ private fun interruptAll() {
+ // Prevent users from interrupting the resource while they are constructing their next command, as this will
+ // only lead to infinite recursion.
+ if (isInterrupting) {
+ return
+ }
+
+ try {
+ isInterrupting = true
+
+ val iterator = _inputs.iterator()
+ while (iterator.hasNext()) {
+ val input = iterator.next()
+ input.interrupt()
+
+ if (input.state != SimResourceState.Active) {
+ iterator.remove()
+ }
+ }
+ } finally {
+ isInterrupting = false
+ }
+ }
+
+ /**
+ * An internal [SimResourceConsumer] implementation for aggregator inputs.
+ */
+ private inner class Consumer : SimResourceConsumer {
+ override fun onStart(ctx: SimResourceContext) {
+ onContextStarted(ctx)
+
+ // Make sure we initialize the output if we have not done so yet
+ if (context.state == SimResourceState.Pending) {
+ context.start()
+ }
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ doSchedule()
+
+ return commands[ctx] ?: SimResourceCommand.Idle()
+ }
+
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ onContextFinished(ctx)
+
+ super.onFinish(ctx, cause)
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index dba334a2..f65cbaf4 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -30,17 +30,10 @@ import kotlin.math.min
/**
* Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
*/
-public abstract class SimAbstractResourceContext<R : SimResource>(
- override val resource: R,
+public abstract class SimAbstractResourceContext(
override val clock: Clock,
- private val consumer: SimResourceConsumer<R>
-) : SimResourceContext<R> {
- /**
- * The capacity of the resource.
- */
- override val capacity: Double
- get() = resource.capacity
-
+ private val consumer: SimResourceConsumer
+) : SimResourceContext {
/**
* The amount of work still remaining at this instant.
*/
@@ -51,6 +44,12 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
}
/**
+ * A flag to indicate the state of the context.
+ */
+ public var state: SimResourceState = SimResourceState.Pending
+ private set
+
+ /**
* This method is invoked when the resource will idle until the specified [deadline].
*/
public abstract fun onIdle(deadline: Long)
@@ -178,7 +177,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
if (command.deadline <= now || !isIntermediate) {
next(now)
} else {
- activeCommand
+ interpret(SimResourceCommand.Idle(command.deadline), now)
}
}
is SimResourceCommand.Consume -> {
@@ -214,7 +213,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
flush()
}
- override fun toString(): String = "SimAbstractResourceContext[resource=$resource]"
+ override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]"
/**
* A flag to indicate that the resource is currently processing a command.
@@ -222,11 +221,6 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
protected var isProcessing: Boolean = false
/**
- * A flag to indicate the state of the context.
- */
- private var state: SimResourceState = SimResourceState.Pending
-
- /**
* The current command that is being processed.
*/
private var activeCommand: CommandWrapper? = null
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
new file mode 100644
index 00000000..bb4e6a2c
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource.
+ */
+public interface SimResourceAggregator : AutoCloseable {
+ /**
+ * The output resource provider to which resource consumers can be attached.
+ */
+ public val output: SimResourceProvider
+
+ /**
+ * The input resources that will be switched between the output providers.
+ */
+ public val inputs: Set<SimResourceProvider>
+
+ /**
+ * Add the specified [input] to the switch.
+ */
+ public fun addInput(input: SimResourceProvider)
+
+ /**
+ * End the lifecycle of the aggregator.
+ */
+ public override fun close()
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
new file mode 100644
index 00000000..08bc064e
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * A [SimResourceAggregator] that distributes the load equally across the input resources.
+ */
+public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) {
+ private val consumers = mutableListOf<SimResourceContext>()
+
+ override fun doConsume(work: Double, limit: Double, deadline: Long) {
+ // Sort all consumers by their capacity
+ consumers.sortWith(compareBy { it.capacity })
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (input in consumers) {
+ val inputCapacity = input.capacity
+ val fraction = inputCapacity / outputContext.capacity
+ val grantedSpeed = limit * fraction
+ val grantedWork = fraction * work
+
+ commands[input] =
+ if (grantedWork > 0.0 && grantedSpeed > 0.0)
+ SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+ }
+ }
+
+ override fun onContextStarted(ctx: SimResourceContext) {
+ super.onContextStarted(ctx)
+
+ consumers.add(ctx)
+ }
+
+ override fun onContextFinished(ctx: SimResourceContext) {
+ super.onContextFinished(ctx)
+
+ consumers.remove(ctx)
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
index 01b56488..04c7fcaf 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -28,13 +28,13 @@ package org.opendc.simulator.resources
* Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently)
* for multiple resource providers, unless explicitly said otherwise.
*/
-public interface SimResourceConsumer<in R : SimResource> {
+public interface SimResourceConsumer {
/**
* This method is invoked when the consumer is started for some resource.
*
* @param ctx The execution context in which the consumer runs.
*/
- public fun onStart(ctx: SimResourceContext<R>) {}
+ public fun onStart(ctx: SimResourceContext) {}
/**
* This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because
@@ -43,7 +43,7 @@ public interface SimResourceConsumer<in R : SimResource> {
* @param ctx The execution context in which the consumer runs.
* @return The next command that the resource should execute.
*/
- public fun onNext(ctx: SimResourceContext<R>): SimResourceCommand
+ public fun onNext(ctx: SimResourceContext): SimResourceCommand
/**
* This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit],
@@ -54,5 +54,5 @@ public interface SimResourceConsumer<in R : SimResource> {
* @param ctx The execution context in which the consumer ran.
* @param cause The cause of the finish in case the resource finished exceptionally.
*/
- public fun onFinish(ctx: SimResourceContext<R>, cause: Throwable? = null) {}
+ public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
index f13764fb..11dbb09f 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -28,12 +28,7 @@ import java.time.Clock
* The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a
* resource and a resource consumer.
*/
-public interface SimResourceContext<out R : SimResource> {
- /**
- * The resource that is managed by this context.
- */
- public val resource: R
-
+public interface SimResourceContext {
/**
* The virtual clock tracking simulation time.
*/
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
index 31b0a175..b2759b7f 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -23,11 +23,21 @@
package org.opendc.simulator.resources
/**
- * A generic representation of resource that may be consumed.
+ * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers.
*/
-public interface SimResource {
+public interface SimResourceDistributor : AutoCloseable {
/**
- * The capacity of the resource.
+ * The output resource providers to which resource consumers can be attached.
*/
- public val capacity: Double
+ public val outputs: Set<SimResourceProvider>
+
+ /**
+ * The input resource that will be distributed over the consumers.
+ */
+ public val input: SimResourceProvider
+
+ /**
+ * Add an output to the switch with the specified [capacity].
+ */
+ public fun addOutput(capacity: Double): SimResourceProvider
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
new file mode 100644
index 00000000..b0f27b9d
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -0,0 +1,423 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing.
+ */
+public class SimResourceDistributorMaxMin(
+ override val input: SimResourceProvider,
+ private val clock: Clock,
+ private val listener: Listener? = null
+) : SimResourceDistributor {
+ override val outputs: Set<SimResourceProvider>
+ get() = _outputs
+ private val _outputs = mutableSetOf<OutputProvider>()
+
+ /**
+ * The active output contexts.
+ */
+ private val outputContexts: MutableList<OutputContext> = mutableListOf()
+
+ /**
+ * The total speed requested by the output resources.
+ */
+ private var totalRequestedSpeed = 0.0
+
+ /**
+ * The total amount of work requested by the output resources.
+ */
+ private var totalRequestedWork = 0.0
+
+ /**
+ * The total allocated speed for the output resources.
+ */
+ private var totalAllocatedSpeed = 0.0
+
+ /**
+ * The total allocated work requested for the output resources.
+ */
+ private var totalAllocatedWork = 0.0
+
+ /**
+ * The amount of work that could not be performed due to over-committing resources.
+ */
+ private var totalOvercommittedWork = 0.0
+
+ /**
+ * The amount of work that was lost due to interference.
+ */
+ private var totalInterferedWork = 0.0
+
+ /**
+ * A flag to indicate that the switch is closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
+ * An internal [SimResourceConsumer] implementation for switch inputs.
+ */
+ private val consumer = object : SimResourceConsumer {
+ /**
+ * The resource context of the consumer.
+ */
+ private lateinit var ctx: SimResourceContext
+
+ val remainingWork: Double
+ get() = ctx.remainingWork
+
+ override fun onStart(ctx: SimResourceContext) {
+ this.ctx = ctx
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return doNext(ctx.capacity)
+ }
+
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ super.onFinish(ctx, cause)
+
+ val iterator = _outputs.iterator()
+ while (iterator.hasNext()) {
+ val output = iterator.next()
+
+ // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
+ // during the call tou output.close()
+ iterator.remove()
+
+ output.close()
+ }
+ }
+ }
+
+ /**
+ * The total amount of remaining work.
+ */
+ private val totalRemainingWork: Double
+ get() = consumer.remainingWork
+
+ override fun addOutput(capacity: Double): SimResourceProvider {
+ check(!isClosed) { "Distributor has been closed" }
+
+ val provider = OutputProvider(capacity)
+ _outputs.add(provider)
+ return provider
+ }
+
+ override fun close() {
+ if (!isClosed) {
+ isClosed = true
+ input.cancel()
+ }
+ }
+
+ init {
+ input.startConsumer(consumer)
+ }
+
+ /**
+ * Indicate that the workloads should be re-scheduled.
+ */
+ private fun schedule() {
+ input.interrupt()
+ }
+
+ /**
+ * Schedule the work over the physical CPUs.
+ */
+ private fun doSchedule(capacity: Double): SimResourceCommand {
+ // If there is no work yet, mark all inputs as idle.
+ if (outputContexts.isEmpty()) {
+ return SimResourceCommand.Idle()
+ }
+
+ val maxUsage = capacity
+ var duration: Double = Double.MAX_VALUE
+ var deadline: Long = Long.MAX_VALUE
+ var availableSpeed = maxUsage
+ var totalRequestedSpeed = 0.0
+ var totalRequestedWork = 0.0
+
+ // Flush the work of the outputs
+ var outputIterator = outputContexts.listIterator()
+ while (outputIterator.hasNext()) {
+ val output = outputIterator.next()
+
+ output.flush(isIntermediate = true)
+
+ if (output.activeCommand == SimResourceCommand.Exit) {
+ // Apparently the output consumer has exited, so remove it from the scheduling queue.
+ outputIterator.remove()
+ }
+ }
+
+ // Sort the outputs based on their requested usage
+ // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
+ outputContexts.sort()
+
+ // Divide the available input capacity fairly across the outputs using max-min fair sharing
+ outputIterator = outputContexts.listIterator()
+ var remaining = outputContexts.size
+ while (outputIterator.hasNext()) {
+ val output = outputIterator.next()
+ val availableShare = availableSpeed / remaining--
+
+ when (val command = output.activeCommand) {
+ is SimResourceCommand.Idle -> {
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, command.deadline)
+
+ output.actualSpeed = 0.0
+ }
+ is SimResourceCommand.Consume -> {
+ val grantedSpeed = min(output.allowedSpeed, availableShare)
+
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, command.deadline)
+
+ // Ignore idle computation
+ if (grantedSpeed <= 0.0 || command.work <= 0.0) {
+ output.actualSpeed = 0.0
+ continue
+ }
+
+ totalRequestedSpeed += command.limit
+ totalRequestedWork += command.work
+
+ output.actualSpeed = grantedSpeed
+ availableSpeed -= grantedSpeed
+
+ // The duration that we want to run is that of the shortest request from an output
+ duration = min(duration, command.work / grantedSpeed)
+ }
+ SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" }
+ }
+ }
+
+ assert(deadline >= clock.millis()) { "Deadline already passed" }
+
+ this.totalRequestedSpeed = totalRequestedSpeed
+ this.totalRequestedWork = totalRequestedWork
+ this.totalAllocatedSpeed = maxUsage - availableSpeed
+ this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration)
+
+ return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0)
+ SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+ }
+
+ /**
+ * Obtain the next command to perform.
+ */
+ private fun doNext(capacity: Double): SimResourceCommand {
+ val totalRequestedWork = totalRequestedWork.toLong()
+ val totalRemainingWork = totalRemainingWork.toLong()
+ val totalAllocatedWork = totalAllocatedWork.toLong()
+ val totalRequestedSpeed = totalRequestedSpeed
+ val totalAllocatedSpeed = totalAllocatedSpeed
+
+ // Force all inputs to re-schedule their work.
+ val command = doSchedule(capacity)
+
+ // Report metrics
+ listener?.onSliceFinish(
+ this,
+ totalRequestedWork,
+ totalAllocatedWork - totalRemainingWork,
+ totalOvercommittedWork.toLong(),
+ totalInterferedWork.toLong(),
+ totalRequestedSpeed,
+ totalAllocatedSpeed,
+ )
+
+ totalInterferedWork = 0.0
+ totalOvercommittedWork = 0.0
+
+ return command
+ }
+
+ /**
+ * Event listener for hypervisor events.
+ */
+ public interface Listener {
+ /**
+ * This method is invoked when a slice is finished.
+ */
+ public fun onSliceFinish(
+ switch: SimResourceDistributor,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ )
+ }
+
+ /**
+ * An internal [SimResourceProvider] implementation for switch outputs.
+ */
+ private inner class OutputProvider(val capacity: Double) : SimResourceProvider {
+ /**
+ * The [OutputContext] that is currently running.
+ */
+ private var ctx: OutputContext? = null
+
+ override var state: SimResourceState = SimResourceState.Pending
+ internal set
+
+ override fun startConsumer(consumer: SimResourceConsumer) {
+ check(state == SimResourceState.Pending) { "Resource cannot be consumed" }
+
+ val ctx = OutputContext(this, consumer)
+ this.ctx = ctx
+ this.state = SimResourceState.Active
+ outputContexts += ctx
+
+ ctx.start()
+ schedule()
+ }
+
+ override fun close() {
+ cancel()
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Stopped
+ _outputs.remove(this)
+ }
+ }
+
+ override fun interrupt() {
+ ctx?.interrupt()
+ }
+
+ override fun cancel() {
+ val ctx = ctx
+ if (ctx != null) {
+ this.ctx = null
+ ctx.stop()
+ }
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
+ }
+
+ /**
+ * A [SimAbstractResourceContext] for the output resources.
+ */
+ private inner class OutputContext(
+ private val provider: OutputProvider,
+ consumer: SimResourceConsumer
+ ) : SimAbstractResourceContext(clock, consumer), Comparable<OutputContext> {
+ override val capacity: Double
+ get() = provider.capacity
+
+ /**
+ * The current command that is processed by the vCPU.
+ */
+ var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
+
+ /**
+ * The processing speed that is allowed by the model constraints.
+ */
+ var allowedSpeed: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ var actualSpeed: Double = 0.0
+
+ private fun reportOvercommit() {
+ val remainingWork = remainingWork
+ totalOvercommittedWork += remainingWork
+ }
+
+ override fun onIdle(deadline: Long) {
+ reportOvercommit()
+
+ allowedSpeed = 0.0
+ activeCommand = SimResourceCommand.Idle(deadline)
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ reportOvercommit()
+
+ allowedSpeed = getSpeed(limit)
+ activeCommand = SimResourceCommand.Consume(work, limit, deadline)
+ }
+
+ override fun onFinish(cause: Throwable?) {
+ reportOvercommit()
+
+ activeCommand = SimResourceCommand.Exit
+ provider.cancel()
+
+ super.onFinish(cause)
+ }
+
+ override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
+ // Apply performance interference model
+ val performanceScore = 1.0
+
+ // Compute the remaining amount of work
+ return if (work > 0.0) {
+ // Compute the fraction of compute time allocated to the VM
+ val fraction = actualSpeed / totalAllocatedSpeed
+
+ // Compute the work that was actually granted to the VM.
+ val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
+ val processed = processingAvailable * performanceScore
+
+ val interferedWork = processingAvailable - processed
+
+ totalInterferedWork += interferedWork
+
+ max(0.0, work - processed)
+ } else {
+ 0.0
+ }
+ }
+
+ override fun interrupt() {
+ // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
+ // to infinite recursion.
+ if (isProcessing) {
+ return
+ }
+
+ super.interrupt()
+
+ // Force the scheduler to re-schedule
+ schedule()
+ }
+
+ override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed)
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
index 732e709a..227f4d62 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
@@ -25,17 +25,16 @@ package org.opendc.simulator.resources
/**
* A helper class to construct a [SimResourceProvider] which forwards the requests to a [SimResourceConsumer].
*/
-public class SimResourceForwarder<R : SimResource>(override val resource: R) :
- SimResourceProvider<R>, SimResourceConsumer<R> {
+public class SimResourceForwarder : SimResourceProvider, SimResourceConsumer {
/**
* The [SimResourceContext] in which the forwarder runs.
*/
- private var ctx: SimResourceContext<R>? = null
+ private var ctx: SimResourceContext? = null
/**
* The delegate [SimResourceConsumer].
*/
- private var delegate: SimResourceConsumer<R>? = null
+ private var delegate: SimResourceConsumer? = null
/**
* A flag to indicate that the delegate was started.
@@ -48,11 +47,13 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) :
override var state: SimResourceState = SimResourceState.Pending
private set
- override fun startConsumer(consumer: SimResourceConsumer<R>) {
+ override fun startConsumer(consumer: SimResourceConsumer) {
check(state == SimResourceState.Pending) { "Resource is in invalid state" }
state = SimResourceState.Active
delegate = consumer
+
+ // Interrupt the provider to replace the consumer
interrupt()
}
@@ -83,11 +84,11 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) :
}
}
- override fun onStart(ctx: SimResourceContext<R>) {
+ override fun onStart(ctx: SimResourceContext) {
this.ctx = ctx
}
- override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
val delegate = delegate
if (!hasDelegateStarted) {
@@ -117,7 +118,7 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) :
}
}
- override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) {
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
this.ctx = null
val delegate = delegate
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
index 1593281b..52b13c5c 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -27,12 +27,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine
/**
* A [SimResourceProvider] provides some resource of type [R].
*/
-public interface SimResourceProvider<out R : SimResource> : AutoCloseable {
- /**
- * The resource that is managed by this provider.
- */
- public val resource: R
-
+public interface SimResourceProvider : AutoCloseable {
/**
* The state of the resource.
*/
@@ -43,7 +38,7 @@ public interface SimResourceProvider<out R : SimResource> : AutoCloseable {
*
* @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended.
*/
- public fun startConsumer(consumer: SimResourceConsumer<R>)
+ public fun startConsumer(consumer: SimResourceConsumer)
/**
* Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op.
@@ -67,15 +62,15 @@ public interface SimResourceProvider<out R : SimResource> : AutoCloseable {
* Consume the resource provided by this provider using the specified [consumer] and suspend execution until
* the consumer has finished.
*/
-public suspend fun <R : SimResource> SimResourceProvider<R>.consume(consumer: SimResourceConsumer<R>) {
+public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) {
return suspendCancellableCoroutine { cont ->
- startConsumer(object : SimResourceConsumer<R> by consumer {
- override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) {
+ startConsumer(object : SimResourceConsumer by consumer {
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
assert(!cont.isCompleted) { "Coroutine already completed" }
- cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit))
-
consumer.onFinish(ctx, cause)
+
+ cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit))
}
override fun toString(): String = "SimSuspendingResourceConsumer"
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 99545c4c..3b4e7e7a 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -31,15 +31,15 @@ import kotlin.math.min
/**
* A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity.
*
- * @param resource The resource to provide.
+ * @param initialCapacity The initial capacity of the resource.
* @param clock The virtual clock to track simulation time.
* @param scheduler The scheduler to schedule the interrupts.
*/
-public class SimResourceSource<R : SimResource>(
- override val resource: R,
+public class SimResourceSource(
+ private val initialCapacity: Double,
private val clock: Clock,
private val scheduler: TimerScheduler<Any>
-) : SimResourceProvider<R> {
+) : SimResourceProvider {
/**
* The resource processing speed over time.
*/
@@ -55,7 +55,7 @@ public class SimResourceSource<R : SimResource>(
override var state: SimResourceState = SimResourceState.Pending
private set
- override fun startConsumer(consumer: SimResourceConsumer<R>) {
+ override fun startConsumer(consumer: SimResourceConsumer) {
check(state == SimResourceState.Pending) { "Resource is in invalid state" }
val ctx = Context(consumer)
@@ -89,7 +89,9 @@ public class SimResourceSource<R : SimResource>(
/**
* Internal implementation of [SimResourceContext] for this class.
*/
- private inner class Context(consumer: SimResourceConsumer<R>) : SimAbstractResourceContext<R>(resource, clock, consumer) {
+ private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(clock, consumer) {
+ override val capacity: Double = initialCapacity
+
/**
* The processing speed of the resource.
*/
@@ -123,6 +125,6 @@ public class SimResourceSource<R : SimResource>(
super.onFinish(cause)
}
- override fun toString(): String = "SimResourceSource.Context[resource=$resource]"
+ override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]"
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
index cd1af3fc..53fec16a 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
@@ -23,26 +23,26 @@
package org.opendc.simulator.resources
/**
- * A [SimResourceSwitch] enables switching of capacity of multiple resources of type [R] between multiple consumers.
+ * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers.
*/
-public interface SimResourceSwitch<R : SimResource> : AutoCloseable {
+public interface SimResourceSwitch : AutoCloseable {
/**
* The output resource providers to which resource consumers can be attached.
*/
- public val outputs: Set<SimResourceProvider<R>>
+ public val outputs: Set<SimResourceProvider>
/**
* The input resources that will be switched between the output providers.
*/
- public val inputs: Set<SimResourceProvider<R>>
+ public val inputs: Set<SimResourceProvider>
/**
- * Add an output to the switch represented by [resource].
+ * Add an output to the switch with the specified [capacity].
*/
- public fun addOutput(resource: R): SimResourceProvider<R>
+ public fun addOutput(capacity: Double): SimResourceProvider
/**
* Add the specified [input] to the switch.
*/
- public fun addInput(input: SimResourceProvider<R>)
+ public fun addInput(input: SimResourceProvider)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
index 5eea78f6..6e431ea1 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -28,45 +28,45 @@ import java.util.ArrayDeque
* A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that
* a single output is directly connected to an input and that the switch can only support as much outputs as inputs.
*/
-public class SimResourceSwitchExclusive<R : SimResource> : SimResourceSwitch<R> {
+public class SimResourceSwitchExclusive : SimResourceSwitch {
/**
* A flag to indicate that the switch is closed.
*/
private var isClosed: Boolean = false
private val _outputs = mutableSetOf<Provider>()
- override val outputs: Set<SimResourceProvider<R>>
+ override val outputs: Set<SimResourceProvider>
get() = _outputs
- private val availableResources = ArrayDeque<SimResourceForwarder<R>>()
+ private val availableResources = ArrayDeque<SimResourceForwarder>()
- private val _inputs = mutableSetOf<SimResourceProvider<R>>()
- override val inputs: Set<SimResourceProvider<R>>
+ private val _inputs = mutableSetOf<SimResourceProvider>()
+ override val inputs: Set<SimResourceProvider>
get() = _inputs
- override fun addOutput(resource: R): SimResourceProvider<R> {
+ override fun addOutput(capacity: Double): SimResourceProvider {
check(!isClosed) { "Switch has been closed" }
check(availableResources.isNotEmpty()) { "No capacity to serve request" }
val forwarder = availableResources.poll()
- val output = Provider(resource, forwarder)
+ val output = Provider(capacity, forwarder)
_outputs += output
return output
}
- override fun addInput(input: SimResourceProvider<R>) {
+ override fun addInput(input: SimResourceProvider) {
check(!isClosed) { "Switch has been closed" }
if (input in inputs) {
return
}
- val forwarder = SimResourceForwarder(input.resource)
+ val forwarder = SimResourceForwarder()
_inputs += input
availableResources += forwarder
- input.startConsumer(object : SimResourceConsumer<R> by forwarder {
- override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) {
+ input.startConsumer(object : SimResourceConsumer by forwarder {
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
// De-register the input after it has finished
_inputs -= input
forwarder.onFinish(ctx, cause)
@@ -78,13 +78,13 @@ public class SimResourceSwitchExclusive<R : SimResource> : SimResourceSwitch<R>
isClosed = true
// Cancel all upstream subscriptions
- _inputs.forEach(SimResourceProvider<R>::cancel)
+ _inputs.forEach(SimResourceProvider::cancel)
}
private inner class Provider(
- override val resource: R,
- private val forwarder: SimResourceForwarder<R>
- ) : SimResourceProvider<R> by forwarder {
+ private val capacity: Double,
+ private val forwarder: SimResourceForwarder
+ ) : SimResourceProvider by forwarder {
override fun close() {
_outputs -= this
availableResources += forwarder
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index ee8edfcd..c796c251 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -23,97 +23,61 @@
package org.opendc.simulator.resources
import kotlinx.coroutines.*
-import org.opendc.simulator.resources.consumer.SimConsumerBarrier
import java.time.Clock
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
/**
* A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
* fair sharing.
*/
-public class SimResourceSwitchMaxMin<R : SimResource>(
- private val clock: Clock,
- private val listener: Listener<R>? = null
-) : SimResourceSwitch<R> {
- private val inputConsumers = mutableSetOf<InputConsumer>()
- private val _outputs = mutableSetOf<OutputProvider>()
- override val outputs: Set<SimResourceProvider<R>>
+public class SimResourceSwitchMaxMin(
+ clock: Clock,
+ private val listener: Listener? = null
+) : SimResourceSwitch {
+ private val _outputs = mutableSetOf<SimResourceProvider>()
+ override val outputs: Set<SimResourceProvider>
get() = _outputs
- private val _inputs = mutableSetOf<SimResourceProvider<R>>()
- override val inputs: Set<SimResourceProvider<R>>
+ private val _inputs = mutableSetOf<SimResourceProvider>()
+ override val inputs: Set<SimResourceProvider>
get() = _inputs
/**
- * The commands to submit to the underlying host.
+ * A flag to indicate that the switch was closed.
*/
- private val commands = mutableMapOf<R, SimResourceCommand>()
+ private var isClosed = false
/**
- * The active output contexts.
+ * The aggregator to aggregate the resources.
*/
- private val outputContexts: MutableList<OutputContext> = mutableListOf()
+ private val aggregator = SimResourceAggregatorMaxMin(clock)
/**
- * The remaining work of all inputs.
+ * The distributor to distribute the aggregated resources.
*/
- private val totalRemainingWork: Double
- get() = inputConsumers.sumByDouble { it.remainingWork }
-
- /**
- * The total speed requested by the vCPUs.
- */
- private var totalRequestedSpeed = 0.0
-
- /**
- * The total amount of work requested by the vCPUs.
- */
- private var totalRequestedWork = 0.0
-
- /**
- * The total allocated speed for the vCPUs.
- */
- private var totalAllocatedSpeed = 0.0
-
- /**
- * The total allocated work requested for the vCPUs.
- */
- private var totalAllocatedWork = 0.0
-
- /**
- * The amount of work that could not be performed due to over-committing resources.
- */
- private var totalOvercommittedWork = 0.0
-
- /**
- * The amount of work that was lost due to interference.
- */
- private var totalInterferedWork = 0.0
-
- /**
- * A flag to indicate that the scheduler has submitted work that has not yet been completed.
- */
- private var isDirty: Boolean = false
-
- /**
- * The scheduler barrier.
- */
- private var barrier: SimConsumerBarrier = SimConsumerBarrier(0)
-
- /**
- * A flag to indicate that the switch is closed.
- */
- private var isClosed: Boolean = false
+ private val distributor = SimResourceDistributorMaxMin(
+ aggregator.output, clock,
+ object : SimResourceDistributorMaxMin.Listener {
+ override fun onSliceFinish(
+ switch: SimResourceDistributor,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand)
+ }
+ }
+ )
/**
* Add an output to the switch represented by [resource].
*/
- override fun addOutput(resource: R): SimResourceProvider<R> {
+ override fun addOutput(capacity: Double): SimResourceProvider {
check(!isClosed) { "Switch has been closed" }
- val provider = OutputProvider(resource)
+ val provider = distributor.addOutput(capacity)
_outputs.add(provider)
return provider
}
@@ -121,169 +85,29 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
/**
* Add the specified [input] to the switch.
*/
- override fun addInput(input: SimResourceProvider<R>) {
+ override fun addInput(input: SimResourceProvider) {
check(!isClosed) { "Switch has been closed" }
- val consumer = InputConsumer(input)
- _inputs.add(input)
- inputConsumers += consumer
+ aggregator.addInput(input)
}
override fun close() {
- isClosed = true
- }
-
- /**
- * Indicate that the workloads should be re-scheduled.
- */
- private fun schedule() {
- isDirty = true
- interruptAll()
- }
-
- /**
- * Schedule the work over the physical CPUs.
- */
- private fun doSchedule() {
- // If there is no work yet, mark all inputs as idle.
- if (outputContexts.isEmpty()) {
- commands.replaceAll { _, _ -> SimResourceCommand.Idle() }
- interruptAll()
- }
-
- val maxUsage = inputs.sumByDouble { it.resource.capacity }
- var duration: Double = Double.MAX_VALUE
- var deadline: Long = Long.MAX_VALUE
- var availableSpeed = maxUsage
- var totalRequestedSpeed = 0.0
- var totalRequestedWork = 0.0
-
- // Sort the outputs based on their requested usage
- // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
- outputContexts.sort()
-
- // Divide the available input capacity fairly across the outputs using max-min fair sharing
- val outputIterator = outputContexts.listIterator()
- var remaining = outputContexts.size
- while (outputIterator.hasNext()) {
- val output = outputIterator.next()
- val availableShare = availableSpeed / remaining--
-
- when (val command = output.activeCommand) {
- is SimResourceCommand.Idle -> {
- // Take into account the minimum deadline of this slice before we possible continue
- deadline = min(deadline, command.deadline)
-
- output.actualSpeed = 0.0
- }
- is SimResourceCommand.Consume -> {
- val grantedSpeed = min(output.allowedSpeed, availableShare)
-
- // Take into account the minimum deadline of this slice before we possible continue
- deadline = min(deadline, command.deadline)
-
- // Ignore idle computation
- if (grantedSpeed <= 0.0 || command.work <= 0.0) {
- output.actualSpeed = 0.0
- continue
- }
-
- totalRequestedSpeed += command.limit
- totalRequestedWork += command.work
-
- output.actualSpeed = grantedSpeed
- availableSpeed -= grantedSpeed
-
- // The duration that we want to run is that of the shortest request from an output
- duration = min(duration, command.work / grantedSpeed)
- }
- SimResourceCommand.Exit -> {
- // Apparently the output consumer has exited, so remove it from the scheduling queue.
- outputIterator.remove()
- }
- }
- }
-
- // Round the duration to milliseconds
- duration = ceil(duration * 1000) / 1000
-
- assert(deadline >= clock.millis()) { "Deadline already passed" }
-
- val totalAllocatedSpeed = maxUsage - availableSpeed
- var totalAllocatedWork = 0.0
- availableSpeed = totalAllocatedSpeed
-
- // Divide the requests over the available capacity of the input resources fairly
- for (input in inputs.sortedByDescending { it.resource.capacity }) {
- val maxResourceUsage = input.resource.capacity
- val fraction = maxResourceUsage / maxUsage
- val grantedSpeed = min(maxResourceUsage, totalAllocatedSpeed * fraction)
- val grantedWork = duration * grantedSpeed
-
- commands[input.resource] =
- if (grantedWork > 0.0 && grantedSpeed > 0.0)
- SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
- else
- SimResourceCommand.Idle(deadline)
-
- totalAllocatedWork += grantedWork
- availableSpeed -= grantedSpeed
- }
-
- this.totalRequestedSpeed = totalRequestedSpeed
- this.totalRequestedWork = totalRequestedWork
- this.totalAllocatedSpeed = totalAllocatedSpeed
- this.totalAllocatedWork = totalAllocatedWork
-
- interruptAll()
- }
-
- /**
- * Flush the progress of the vCPUs.
- */
- private fun flushGuests() {
- val totalRemainingWork = totalRemainingWork
-
- // Flush all the outputs work
- for (output in outputContexts) {
- output.flush(isIntermediate = true)
- }
-
- // Report metrics
- listener?.onSliceFinish(
- this,
- totalRequestedWork.toLong(),
- (totalAllocatedWork - totalRemainingWork).toLong(),
- totalOvercommittedWork.toLong(),
- totalInterferedWork.toLong(),
- totalRequestedSpeed,
- totalAllocatedSpeed
- )
- totalInterferedWork = 0.0
- totalOvercommittedWork = 0.0
-
- // Force all inputs to re-schedule their work.
- doSchedule()
- }
-
- /**
- * Interrupt all inputs.
- */
- private fun interruptAll() {
- for (input in inputConsumers) {
- input.interrupt()
+ if (!isClosed) {
+ isClosed = true
+ distributor.close()
+ aggregator.close()
}
}
/**
* Event listener for hypervisor events.
*/
- public interface Listener<R : SimResource> {
+ public interface Listener {
/**
* This method is invoked when a slice is finished.
*/
public fun onSliceFinish(
- switch: SimResourceSwitchMaxMin<R>,
+ switch: SimResourceSwitchMaxMin,
requestedWork: Long,
grantedWork: Long,
overcommittedWork: Long,
@@ -292,203 +116,4 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
cpuDemand: Double
)
}
-
- /**
- * An internal [SimResourceProvider] implementation for switch outputs.
- */
- private inner class OutputProvider(override val resource: R) : SimResourceProvider<R> {
- /**
- * The [OutputContext] that is currently running.
- */
- private var ctx: OutputContext? = null
-
- override var state: SimResourceState = SimResourceState.Pending
- internal set
-
- override fun startConsumer(consumer: SimResourceConsumer<R>) {
- check(state == SimResourceState.Pending) { "Resource cannot be consumed" }
-
- val ctx = OutputContext(this, resource, consumer)
- this.ctx = ctx
- this.state = SimResourceState.Active
- outputContexts += ctx
-
- ctx.start()
- schedule()
- }
-
- override fun close() {
- cancel()
-
- state = SimResourceState.Stopped
- _outputs.remove(this)
- }
-
- override fun interrupt() {
- ctx?.interrupt()
- }
-
- override fun cancel() {
- val ctx = ctx
- if (ctx != null) {
- this.ctx = null
- ctx.stop()
- }
-
- if (state != SimResourceState.Stopped) {
- state = SimResourceState.Pending
- }
- }
- }
-
- /**
- * A [SimAbstractResourceContext] for the output resources.
- */
- private inner class OutputContext(
- private val provider: OutputProvider,
- resource: R,
- consumer: SimResourceConsumer<R>
- ) : SimAbstractResourceContext<R>(resource, clock, consumer), Comparable<OutputContext> {
- /**
- * The current command that is processed by the vCPU.
- */
- var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
-
- /**
- * The processing speed that is allowed by the model constraints.
- */
- var allowedSpeed: Double = 0.0
-
- /**
- * The actual processing speed.
- */
- var actualSpeed: Double = 0.0
-
- private fun reportOvercommit() {
- totalOvercommittedWork += remainingWork
- }
-
- override fun onIdle(deadline: Long) {
- reportOvercommit()
-
- allowedSpeed = 0.0
- activeCommand = SimResourceCommand.Idle(deadline)
- }
-
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- reportOvercommit()
-
- allowedSpeed = getSpeed(limit)
- activeCommand = SimResourceCommand.Consume(work, limit, deadline)
- }
-
- override fun onFinish(cause: Throwable?) {
- reportOvercommit()
-
- activeCommand = SimResourceCommand.Exit
- provider.cancel()
-
- super.onFinish(cause)
- }
-
- override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
- // Apply performance interference model
- val performanceScore = 1.0
-
- // Compute the remaining amount of work
- return if (work > 0.0) {
- // Compute the fraction of compute time allocated to the VM
- val fraction = actualSpeed / totalAllocatedSpeed
-
- // Compute the work that was actually granted to the VM.
- val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
- val processed = processingAvailable * performanceScore
-
- val interferedWork = processingAvailable - processed
-
- totalInterferedWork += interferedWork
-
- max(0.0, work - processed)
- } else {
- 0.0
- }
- }
-
- override fun interrupt() {
- // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
- // to infinite recursion.
- if (isProcessing) {
- return
- }
-
- super.interrupt()
-
- // Force the scheduler to re-schedule
- schedule()
- }
-
- override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed)
- }
-
- /**
- * An internal [SimResourceConsumer] implementation for switch inputs.
- */
- private inner class InputConsumer(val input: SimResourceProvider<R>) : SimResourceConsumer<R> {
- /**
- * The resource context of the consumer.
- */
- private lateinit var ctx: SimResourceContext<R>
-
- /**
- * The remaining work of this consumer.
- */
- val remainingWork: Double
- get() = ctx.remainingWork
-
- init {
- barrier = SimConsumerBarrier(barrier.parties + 1)
- input.startConsumer(this@InputConsumer)
- }
-
- /**
- * Interrupt the consumer
- */
- fun interrupt() {
- ctx.interrupt()
- }
-
- override fun onStart(ctx: SimResourceContext<R>) {
- this.ctx = ctx
- }
-
- override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand {
- val isLast = barrier.enter()
-
- // Flush the progress of the guest after the barrier has been reached.
- if (isLast && isDirty) {
- isDirty = false
- flushGuests()
- }
-
- return if (isDirty) {
- // Wait for the scheduler determine the work after the barrier has been reached by all CPUs.
- SimResourceCommand.Idle()
- } else {
- // Indicate that the scheduler needs to run next call.
- if (isLast) {
- isDirty = true
- }
-
- commands[ctx.resource] ?: SimResourceCommand.Idle()
- }
- }
-
- override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) {
- barrier = SimConsumerBarrier(barrier.parties - 1)
- inputConsumers -= this@InputConsumer
- _inputs -= input
-
- super.onFinish(ctx, cause)
- }
- }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
index 7aa5a5aa..52a42241 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
@@ -42,4 +42,11 @@ public class SimConsumerBarrier(public val parties: Int) {
}
return false
}
+
+ /**
+ * Reset the barrier.
+ */
+ public fun reset() {
+ counter = 0
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
index 0189fe4c..a52d1d5d 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources.consumer
-import org.opendc.simulator.resources.SimResource
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
@@ -31,15 +30,15 @@ import org.opendc.simulator.resources.SimResourceContext
* A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource
* consumption for some period of time.
*/
-public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> {
+public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer {
private var iterator: Iterator<Fragment>? = null
- override fun onStart(ctx: SimResourceContext<SimResource>) {
+ override fun onStart(ctx: SimResourceContext) {
check(iterator == null) { "Consumer already running" }
iterator = trace.iterator()
}
- override fun onNext(ctx: SimResourceContext<SimResource>): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
val iterator = checkNotNull(iterator)
return if (iterator.hasNext()) {
val now = ctx.clock.millis()
@@ -58,7 +57,7 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour
}
}
- override fun onFinish(ctx: SimResourceContext<SimResource>, cause: Throwable?) {
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
iterator = null
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
index 62425583..8f24a020 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources.consumer
-import org.opendc.simulator.resources.SimResource
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
@@ -30,10 +29,10 @@ import org.opendc.simulator.resources.SimResourceContext
/**
* A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization.
*/
-public class SimWorkConsumer<R : SimResource>(
+public class SimWorkConsumer(
private val work: Double,
private val utilization: Double
-) : SimResourceConsumer<R> {
+) : SimResourceConsumer {
init {
require(work >= 0.0) { "Work must be positive" }
@@ -43,12 +42,12 @@ public class SimWorkConsumer<R : SimResource>(
private var limit = 0.0
private var remainingWork: Double = 0.0
- override fun onStart(ctx: SimResourceContext<R>) {
- limit = ctx.resource.capacity * utilization
+ override fun onStart(ctx: SimResourceContext) {
+ limit = ctx.capacity * utilization
remainingWork = work
}
- override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
val work = this.remainingWork + ctx.remainingWork
this.remainingWork -= work
return if (work > 0.0) {