diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-22 16:45:13 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-22 18:16:40 +0100 |
| commit | 3718c385f84b463ac799080bb5603e0011adcd7d (patch) | |
| tree | 414e4c9fa82ade602cfdae4384f39b0bdb6cb139 /simulator/opendc-simulator/opendc-simulator-resources/src/main | |
| parent | f616b720406250b1415593ff04c9d910b1fda54c (diff) | |
simulator: Remove generic resource constraint from resource model
This change removes the generic resource constraint (e.g., SimResource)
and replaces it by a simple capacity property. In the future, users
should handle the resource properties on a higher level.
This change simplifies compositions of consumers and providers by not
requiring a translation from resource to capacity.
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-resources/src/main')
17 files changed, 865 insertions, 506 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt new file mode 100644 index 00000000..18ac0cd8 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * Abstract implementation of [SimResourceAggregator]. + */ +public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator { + /** + * The available resource provider contexts. + */ + protected val inputContexts: Set<SimResourceContext> + get() = _inputContexts + private val _inputContexts = mutableSetOf<SimResourceContext>() + + /** + * The output context. + */ + protected val outputContext: SimResourceContext + get() = context + + /** + * The commands to submit to the underlying input resources. + */ + protected val commands: MutableMap<SimResourceContext, SimResourceCommand> = mutableMapOf() + + /** + * This method is invoked when the resource consumer consumes resources. + */ + protected abstract fun doConsume(work: Double, limit: Double, deadline: Long) + + /** + * This method is invoked when the resource consumer enters an idle state. + */ + protected open fun doIdle(deadline: Long) { + for (input in inputContexts) { + commands[input] = SimResourceCommand.Idle(deadline) + } + } + + /** + * This method is invoked when the resource consumer finishes processing. + */ + protected open fun doFinish(cause: Throwable?) { + for (input in inputContexts) { + commands[input] = SimResourceCommand.Exit + } + } + + /** + * This method is invoked when an input context is started. + */ + protected open fun onContextStarted(ctx: SimResourceContext) { + _inputContexts.add(ctx) + } + + protected open fun onContextFinished(ctx: SimResourceContext) { + assert(_inputContexts.remove(ctx)) { "Lost context" } + } + + override fun addInput(input: SimResourceProvider) { + check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" } + + val consumer = Consumer() + _inputs.add(input) + input.startConsumer(consumer) + } + + override fun close() { + output.close() + } + + override val output: SimResourceProvider + get() = _output + private val _output = SimResourceForwarder() + + override val inputs: Set<SimResourceProvider> + get() = _inputs + private val _inputs = mutableSetOf<SimResourceProvider>() + + private val context = object : SimAbstractResourceContext(clock, _output) { + override val capacity: Double + get() = inputContexts.sumByDouble { it.capacity } + + override val remainingWork: Double + get() = inputContexts.sumByDouble { it.remainingWork } + + override fun interrupt() { + super.interrupt() + + interruptAll() + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + doConsume(work, limit, deadline) + } + + override fun onIdle(deadline: Long) { + doIdle(deadline) + } + + override fun onFinish(cause: Throwable?) { + doFinish(cause) + + super.onFinish(cause) + + interruptAll() + } + } + + /** + * A flag to indicate that an interrupt is active. + */ + private var isInterrupting: Boolean = false + + /** + * Schedule the work over the input resources. + */ + private fun doSchedule() { + context.flush(isIntermediate = true) + interruptAll() + } + + /** + * Interrupt all inputs. + */ + private fun interruptAll() { + // Prevent users from interrupting the resource while they are constructing their next command, as this will + // only lead to infinite recursion. + if (isInterrupting) { + return + } + + try { + isInterrupting = true + + val iterator = _inputs.iterator() + while (iterator.hasNext()) { + val input = iterator.next() + input.interrupt() + + if (input.state != SimResourceState.Active) { + iterator.remove() + } + } + } finally { + isInterrupting = false + } + } + + /** + * An internal [SimResourceConsumer] implementation for aggregator inputs. + */ + private inner class Consumer : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext) { + onContextStarted(ctx) + + // Make sure we initialize the output if we have not done so yet + if (context.state == SimResourceState.Pending) { + context.start() + } + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + doSchedule() + + return commands[ctx] ?: SimResourceCommand.Idle() + } + + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + onContextFinished(ctx) + + super.onFinish(ctx, cause) + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index dba334a2..f65cbaf4 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -30,17 +30,10 @@ import kotlin.math.min /** * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. */ -public abstract class SimAbstractResourceContext<R : SimResource>( - override val resource: R, +public abstract class SimAbstractResourceContext( override val clock: Clock, - private val consumer: SimResourceConsumer<R> -) : SimResourceContext<R> { - /** - * The capacity of the resource. - */ - override val capacity: Double - get() = resource.capacity - + private val consumer: SimResourceConsumer +) : SimResourceContext { /** * The amount of work still remaining at this instant. */ @@ -51,6 +44,12 @@ public abstract class SimAbstractResourceContext<R : SimResource>( } /** + * A flag to indicate the state of the context. + */ + public var state: SimResourceState = SimResourceState.Pending + private set + + /** * This method is invoked when the resource will idle until the specified [deadline]. */ public abstract fun onIdle(deadline: Long) @@ -178,7 +177,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>( if (command.deadline <= now || !isIntermediate) { next(now) } else { - activeCommand + interpret(SimResourceCommand.Idle(command.deadline), now) } } is SimResourceCommand.Consume -> { @@ -214,7 +213,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>( flush() } - override fun toString(): String = "SimAbstractResourceContext[resource=$resource]" + override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]" /** * A flag to indicate that the resource is currently processing a command. @@ -222,11 +221,6 @@ public abstract class SimAbstractResourceContext<R : SimResource>( protected var isProcessing: Boolean = false /** - * A flag to indicate the state of the context. - */ - private var state: SimResourceState = SimResourceState.Pending - - /** * The current command that is being processed. */ private var activeCommand: CommandWrapper? = null diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt new file mode 100644 index 00000000..bb4e6a2c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource. + */ +public interface SimResourceAggregator : AutoCloseable { + /** + * The output resource provider to which resource consumers can be attached. + */ + public val output: SimResourceProvider + + /** + * The input resources that will be switched between the output providers. + */ + public val inputs: Set<SimResourceProvider> + + /** + * Add the specified [input] to the switch. + */ + public fun addInput(input: SimResourceProvider) + + /** + * End the lifecycle of the aggregator. + */ + public override fun close() +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt new file mode 100644 index 00000000..08bc064e --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * A [SimResourceAggregator] that distributes the load equally across the input resources. + */ +public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) { + private val consumers = mutableListOf<SimResourceContext>() + + override fun doConsume(work: Double, limit: Double, deadline: Long) { + // Sort all consumers by their capacity + consumers.sortWith(compareBy { it.capacity }) + + // Divide the requests over the available capacity of the input resources fairly + for (input in consumers) { + val inputCapacity = input.capacity + val fraction = inputCapacity / outputContext.capacity + val grantedSpeed = limit * fraction + val grantedWork = fraction * work + + commands[input] = + if (grantedWork > 0.0 && grantedSpeed > 0.0) + SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + } + } + + override fun onContextStarted(ctx: SimResourceContext) { + super.onContextStarted(ctx) + + consumers.add(ctx) + } + + override fun onContextFinished(ctx: SimResourceContext) { + super.onContextFinished(ctx) + + consumers.remove(ctx) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 01b56488..04c7fcaf 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -28,13 +28,13 @@ package org.opendc.simulator.resources * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently) * for multiple resource providers, unless explicitly said otherwise. */ -public interface SimResourceConsumer<in R : SimResource> { +public interface SimResourceConsumer { /** * This method is invoked when the consumer is started for some resource. * * @param ctx The execution context in which the consumer runs. */ - public fun onStart(ctx: SimResourceContext<R>) {} + public fun onStart(ctx: SimResourceContext) {} /** * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because @@ -43,7 +43,7 @@ public interface SimResourceConsumer<in R : SimResource> { * @param ctx The execution context in which the consumer runs. * @return The next command that the resource should execute. */ - public fun onNext(ctx: SimResourceContext<R>): SimResourceCommand + public fun onNext(ctx: SimResourceContext): SimResourceCommand /** * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], @@ -54,5 +54,5 @@ public interface SimResourceConsumer<in R : SimResource> { * @param ctx The execution context in which the consumer ran. * @param cause The cause of the finish in case the resource finished exceptionally. */ - public fun onFinish(ctx: SimResourceContext<R>, cause: Throwable? = null) {} + public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {} } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index f13764fb..11dbb09f 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -28,12 +28,7 @@ import java.time.Clock * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a * resource and a resource consumer. */ -public interface SimResourceContext<out R : SimResource> { - /** - * The resource that is managed by this context. - */ - public val resource: R - +public interface SimResourceContext { /** * The virtual clock tracking simulation time. */ diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt index 31b0a175..b2759b7f 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -23,11 +23,21 @@ package org.opendc.simulator.resources /** - * A generic representation of resource that may be consumed. + * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers. */ -public interface SimResource { +public interface SimResourceDistributor : AutoCloseable { /** - * The capacity of the resource. + * The output resource providers to which resource consumers can be attached. */ - public val capacity: Double + public val outputs: Set<SimResourceProvider> + + /** + * The input resource that will be distributed over the consumers. + */ + public val input: SimResourceProvider + + /** + * Add an output to the switch with the specified [capacity]. + */ + public fun addOutput(capacity: Double): SimResourceProvider } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt new file mode 100644 index 00000000..b0f27b9d --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -0,0 +1,423 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock +import kotlin.math.max +import kotlin.math.min + +/** + * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing. + */ +public class SimResourceDistributorMaxMin( + override val input: SimResourceProvider, + private val clock: Clock, + private val listener: Listener? = null +) : SimResourceDistributor { + override val outputs: Set<SimResourceProvider> + get() = _outputs + private val _outputs = mutableSetOf<OutputProvider>() + + /** + * The active output contexts. + */ + private val outputContexts: MutableList<OutputContext> = mutableListOf() + + /** + * The total speed requested by the output resources. + */ + private var totalRequestedSpeed = 0.0 + + /** + * The total amount of work requested by the output resources. + */ + private var totalRequestedWork = 0.0 + + /** + * The total allocated speed for the output resources. + */ + private var totalAllocatedSpeed = 0.0 + + /** + * The total allocated work requested for the output resources. + */ + private var totalAllocatedWork = 0.0 + + /** + * The amount of work that could not be performed due to over-committing resources. + */ + private var totalOvercommittedWork = 0.0 + + /** + * The amount of work that was lost due to interference. + */ + private var totalInterferedWork = 0.0 + + /** + * A flag to indicate that the switch is closed. + */ + private var isClosed: Boolean = false + + /** + * An internal [SimResourceConsumer] implementation for switch inputs. + */ + private val consumer = object : SimResourceConsumer { + /** + * The resource context of the consumer. + */ + private lateinit var ctx: SimResourceContext + + val remainingWork: Double + get() = ctx.remainingWork + + override fun onStart(ctx: SimResourceContext) { + this.ctx = ctx + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return doNext(ctx.capacity) + } + + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + super.onFinish(ctx, cause) + + val iterator = _outputs.iterator() + while (iterator.hasNext()) { + val output = iterator.next() + + // Remove the output from the outputs to prevent ConcurrentModificationException when removing it + // during the call tou output.close() + iterator.remove() + + output.close() + } + } + } + + /** + * The total amount of remaining work. + */ + private val totalRemainingWork: Double + get() = consumer.remainingWork + + override fun addOutput(capacity: Double): SimResourceProvider { + check(!isClosed) { "Distributor has been closed" } + + val provider = OutputProvider(capacity) + _outputs.add(provider) + return provider + } + + override fun close() { + if (!isClosed) { + isClosed = true + input.cancel() + } + } + + init { + input.startConsumer(consumer) + } + + /** + * Indicate that the workloads should be re-scheduled. + */ + private fun schedule() { + input.interrupt() + } + + /** + * Schedule the work over the physical CPUs. + */ + private fun doSchedule(capacity: Double): SimResourceCommand { + // If there is no work yet, mark all inputs as idle. + if (outputContexts.isEmpty()) { + return SimResourceCommand.Idle() + } + + val maxUsage = capacity + var duration: Double = Double.MAX_VALUE + var deadline: Long = Long.MAX_VALUE + var availableSpeed = maxUsage + var totalRequestedSpeed = 0.0 + var totalRequestedWork = 0.0 + + // Flush the work of the outputs + var outputIterator = outputContexts.listIterator() + while (outputIterator.hasNext()) { + val output = outputIterator.next() + + output.flush(isIntermediate = true) + + if (output.activeCommand == SimResourceCommand.Exit) { + // Apparently the output consumer has exited, so remove it from the scheduling queue. + outputIterator.remove() + } + } + + // Sort the outputs based on their requested usage + // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + outputContexts.sort() + + // Divide the available input capacity fairly across the outputs using max-min fair sharing + outputIterator = outputContexts.listIterator() + var remaining = outputContexts.size + while (outputIterator.hasNext()) { + val output = outputIterator.next() + val availableShare = availableSpeed / remaining-- + + when (val command = output.activeCommand) { + is SimResourceCommand.Idle -> { + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + output.actualSpeed = 0.0 + } + is SimResourceCommand.Consume -> { + val grantedSpeed = min(output.allowedSpeed, availableShare) + + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + // Ignore idle computation + if (grantedSpeed <= 0.0 || command.work <= 0.0) { + output.actualSpeed = 0.0 + continue + } + + totalRequestedSpeed += command.limit + totalRequestedWork += command.work + + output.actualSpeed = grantedSpeed + availableSpeed -= grantedSpeed + + // The duration that we want to run is that of the shortest request from an output + duration = min(duration, command.work / grantedSpeed) + } + SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" } + } + } + + assert(deadline >= clock.millis()) { "Deadline already passed" } + + this.totalRequestedSpeed = totalRequestedSpeed + this.totalRequestedWork = totalRequestedWork + this.totalAllocatedSpeed = maxUsage - availableSpeed + this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration) + + return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) + SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + } + + /** + * Obtain the next command to perform. + */ + private fun doNext(capacity: Double): SimResourceCommand { + val totalRequestedWork = totalRequestedWork.toLong() + val totalRemainingWork = totalRemainingWork.toLong() + val totalAllocatedWork = totalAllocatedWork.toLong() + val totalRequestedSpeed = totalRequestedSpeed + val totalAllocatedSpeed = totalAllocatedSpeed + + // Force all inputs to re-schedule their work. + val command = doSchedule(capacity) + + // Report metrics + listener?.onSliceFinish( + this, + totalRequestedWork, + totalAllocatedWork - totalRemainingWork, + totalOvercommittedWork.toLong(), + totalInterferedWork.toLong(), + totalRequestedSpeed, + totalAllocatedSpeed, + ) + + totalInterferedWork = 0.0 + totalOvercommittedWork = 0.0 + + return command + } + + /** + * Event listener for hypervisor events. + */ + public interface Listener { + /** + * This method is invoked when a slice is finished. + */ + public fun onSliceFinish( + switch: SimResourceDistributor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) + } + + /** + * An internal [SimResourceProvider] implementation for switch outputs. + */ + private inner class OutputProvider(val capacity: Double) : SimResourceProvider { + /** + * The [OutputContext] that is currently running. + */ + private var ctx: OutputContext? = null + + override var state: SimResourceState = SimResourceState.Pending + internal set + + override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource cannot be consumed" } + + val ctx = OutputContext(this, consumer) + this.ctx = ctx + this.state = SimResourceState.Active + outputContexts += ctx + + ctx.start() + schedule() + } + + override fun close() { + cancel() + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Stopped + _outputs.remove(this) + } + } + + override fun interrupt() { + ctx?.interrupt() + } + + override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.stop() + } + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } + } + + /** + * A [SimAbstractResourceContext] for the output resources. + */ + private inner class OutputContext( + private val provider: OutputProvider, + consumer: SimResourceConsumer + ) : SimAbstractResourceContext(clock, consumer), Comparable<OutputContext> { + override val capacity: Double + get() = provider.capacity + + /** + * The current command that is processed by the vCPU. + */ + var activeCommand: SimResourceCommand = SimResourceCommand.Idle() + + /** + * The processing speed that is allowed by the model constraints. + */ + var allowedSpeed: Double = 0.0 + + /** + * The actual processing speed. + */ + var actualSpeed: Double = 0.0 + + private fun reportOvercommit() { + val remainingWork = remainingWork + totalOvercommittedWork += remainingWork + } + + override fun onIdle(deadline: Long) { + reportOvercommit() + + allowedSpeed = 0.0 + activeCommand = SimResourceCommand.Idle(deadline) + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + reportOvercommit() + + allowedSpeed = getSpeed(limit) + activeCommand = SimResourceCommand.Consume(work, limit, deadline) + } + + override fun onFinish(cause: Throwable?) { + reportOvercommit() + + activeCommand = SimResourceCommand.Exit + provider.cancel() + + super.onFinish(cause) + } + + override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { + // Apply performance interference model + val performanceScore = 1.0 + + // Compute the remaining amount of work + return if (work > 0.0) { + // Compute the fraction of compute time allocated to the VM + val fraction = actualSpeed / totalAllocatedSpeed + + // Compute the work that was actually granted to the VM. + val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction + val processed = processingAvailable * performanceScore + + val interferedWork = processingAvailable - processed + + totalInterferedWork += interferedWork + + max(0.0, work - processed) + } else { + 0.0 + } + } + + override fun interrupt() { + // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead + // to infinite recursion. + if (isProcessing) { + return + } + + super.interrupt() + + // Force the scheduler to re-schedule + schedule() + } + + override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt index 732e709a..227f4d62 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -25,17 +25,16 @@ package org.opendc.simulator.resources /** * A helper class to construct a [SimResourceProvider] which forwards the requests to a [SimResourceConsumer]. */ -public class SimResourceForwarder<R : SimResource>(override val resource: R) : - SimResourceProvider<R>, SimResourceConsumer<R> { +public class SimResourceForwarder : SimResourceProvider, SimResourceConsumer { /** * The [SimResourceContext] in which the forwarder runs. */ - private var ctx: SimResourceContext<R>? = null + private var ctx: SimResourceContext? = null /** * The delegate [SimResourceConsumer]. */ - private var delegate: SimResourceConsumer<R>? = null + private var delegate: SimResourceConsumer? = null /** * A flag to indicate that the delegate was started. @@ -48,11 +47,13 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) : override var state: SimResourceState = SimResourceState.Pending private set - override fun startConsumer(consumer: SimResourceConsumer<R>) { + override fun startConsumer(consumer: SimResourceConsumer) { check(state == SimResourceState.Pending) { "Resource is in invalid state" } state = SimResourceState.Active delegate = consumer + + // Interrupt the provider to replace the consumer interrupt() } @@ -83,11 +84,11 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) : } } - override fun onStart(ctx: SimResourceContext<R>) { + override fun onStart(ctx: SimResourceContext) { this.ctx = ctx } - override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { val delegate = delegate if (!hasDelegateStarted) { @@ -117,7 +118,7 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) : } } - override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { this.ctx = null val delegate = delegate diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index 1593281b..52b13c5c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -27,12 +27,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine /** * A [SimResourceProvider] provides some resource of type [R]. */ -public interface SimResourceProvider<out R : SimResource> : AutoCloseable { - /** - * The resource that is managed by this provider. - */ - public val resource: R - +public interface SimResourceProvider : AutoCloseable { /** * The state of the resource. */ @@ -43,7 +38,7 @@ public interface SimResourceProvider<out R : SimResource> : AutoCloseable { * * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended. */ - public fun startConsumer(consumer: SimResourceConsumer<R>) + public fun startConsumer(consumer: SimResourceConsumer) /** * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op. @@ -67,15 +62,15 @@ public interface SimResourceProvider<out R : SimResource> : AutoCloseable { * Consume the resource provided by this provider using the specified [consumer] and suspend execution until * the consumer has finished. */ -public suspend fun <R : SimResource> SimResourceProvider<R>.consume(consumer: SimResourceConsumer<R>) { +public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) { return suspendCancellableCoroutine { cont -> - startConsumer(object : SimResourceConsumer<R> by consumer { - override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) { + startConsumer(object : SimResourceConsumer by consumer { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { assert(!cont.isCompleted) { "Coroutine already completed" } - cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit)) - consumer.onFinish(ctx, cause) + + cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit)) } override fun toString(): String = "SimSuspendingResourceConsumer" diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 99545c4c..3b4e7e7a 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -31,15 +31,15 @@ import kotlin.math.min /** * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity. * - * @param resource The resource to provide. + * @param initialCapacity The initial capacity of the resource. * @param clock The virtual clock to track simulation time. * @param scheduler The scheduler to schedule the interrupts. */ -public class SimResourceSource<R : SimResource>( - override val resource: R, +public class SimResourceSource( + private val initialCapacity: Double, private val clock: Clock, private val scheduler: TimerScheduler<Any> -) : SimResourceProvider<R> { +) : SimResourceProvider { /** * The resource processing speed over time. */ @@ -55,7 +55,7 @@ public class SimResourceSource<R : SimResource>( override var state: SimResourceState = SimResourceState.Pending private set - override fun startConsumer(consumer: SimResourceConsumer<R>) { + override fun startConsumer(consumer: SimResourceConsumer) { check(state == SimResourceState.Pending) { "Resource is in invalid state" } val ctx = Context(consumer) @@ -89,7 +89,9 @@ public class SimResourceSource<R : SimResource>( /** * Internal implementation of [SimResourceContext] for this class. */ - private inner class Context(consumer: SimResourceConsumer<R>) : SimAbstractResourceContext<R>(resource, clock, consumer) { + private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(clock, consumer) { + override val capacity: Double = initialCapacity + /** * The processing speed of the resource. */ @@ -123,6 +125,6 @@ public class SimResourceSource<R : SimResource>( super.onFinish(cause) } - override fun toString(): String = "SimResourceSource.Context[resource=$resource]" + override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt index cd1af3fc..53fec16a 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -23,26 +23,26 @@ package org.opendc.simulator.resources /** - * A [SimResourceSwitch] enables switching of capacity of multiple resources of type [R] between multiple consumers. + * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers. */ -public interface SimResourceSwitch<R : SimResource> : AutoCloseable { +public interface SimResourceSwitch : AutoCloseable { /** * The output resource providers to which resource consumers can be attached. */ - public val outputs: Set<SimResourceProvider<R>> + public val outputs: Set<SimResourceProvider> /** * The input resources that will be switched between the output providers. */ - public val inputs: Set<SimResourceProvider<R>> + public val inputs: Set<SimResourceProvider> /** - * Add an output to the switch represented by [resource]. + * Add an output to the switch with the specified [capacity]. */ - public fun addOutput(resource: R): SimResourceProvider<R> + public fun addOutput(capacity: Double): SimResourceProvider /** * Add the specified [input] to the switch. */ - public fun addInput(input: SimResourceProvider<R>) + public fun addInput(input: SimResourceProvider) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 5eea78f6..6e431ea1 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -28,45 +28,45 @@ import java.util.ArrayDeque * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that * a single output is directly connected to an input and that the switch can only support as much outputs as inputs. */ -public class SimResourceSwitchExclusive<R : SimResource> : SimResourceSwitch<R> { +public class SimResourceSwitchExclusive : SimResourceSwitch { /** * A flag to indicate that the switch is closed. */ private var isClosed: Boolean = false private val _outputs = mutableSetOf<Provider>() - override val outputs: Set<SimResourceProvider<R>> + override val outputs: Set<SimResourceProvider> get() = _outputs - private val availableResources = ArrayDeque<SimResourceForwarder<R>>() + private val availableResources = ArrayDeque<SimResourceForwarder>() - private val _inputs = mutableSetOf<SimResourceProvider<R>>() - override val inputs: Set<SimResourceProvider<R>> + private val _inputs = mutableSetOf<SimResourceProvider>() + override val inputs: Set<SimResourceProvider> get() = _inputs - override fun addOutput(resource: R): SimResourceProvider<R> { + override fun addOutput(capacity: Double): SimResourceProvider { check(!isClosed) { "Switch has been closed" } check(availableResources.isNotEmpty()) { "No capacity to serve request" } val forwarder = availableResources.poll() - val output = Provider(resource, forwarder) + val output = Provider(capacity, forwarder) _outputs += output return output } - override fun addInput(input: SimResourceProvider<R>) { + override fun addInput(input: SimResourceProvider) { check(!isClosed) { "Switch has been closed" } if (input in inputs) { return } - val forwarder = SimResourceForwarder(input.resource) + val forwarder = SimResourceForwarder() _inputs += input availableResources += forwarder - input.startConsumer(object : SimResourceConsumer<R> by forwarder { - override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) { + input.startConsumer(object : SimResourceConsumer by forwarder { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { // De-register the input after it has finished _inputs -= input forwarder.onFinish(ctx, cause) @@ -78,13 +78,13 @@ public class SimResourceSwitchExclusive<R : SimResource> : SimResourceSwitch<R> isClosed = true // Cancel all upstream subscriptions - _inputs.forEach(SimResourceProvider<R>::cancel) + _inputs.forEach(SimResourceProvider::cancel) } private inner class Provider( - override val resource: R, - private val forwarder: SimResourceForwarder<R> - ) : SimResourceProvider<R> by forwarder { + private val capacity: Double, + private val forwarder: SimResourceForwarder + ) : SimResourceProvider by forwarder { override fun close() { _outputs -= this availableResources += forwarder diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index ee8edfcd..c796c251 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -23,97 +23,61 @@ package org.opendc.simulator.resources import kotlinx.coroutines.* -import org.opendc.simulator.resources.consumer.SimConsumerBarrier import java.time.Clock -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min /** * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min * fair sharing. */ -public class SimResourceSwitchMaxMin<R : SimResource>( - private val clock: Clock, - private val listener: Listener<R>? = null -) : SimResourceSwitch<R> { - private val inputConsumers = mutableSetOf<InputConsumer>() - private val _outputs = mutableSetOf<OutputProvider>() - override val outputs: Set<SimResourceProvider<R>> +public class SimResourceSwitchMaxMin( + clock: Clock, + private val listener: Listener? = null +) : SimResourceSwitch { + private val _outputs = mutableSetOf<SimResourceProvider>() + override val outputs: Set<SimResourceProvider> get() = _outputs - private val _inputs = mutableSetOf<SimResourceProvider<R>>() - override val inputs: Set<SimResourceProvider<R>> + private val _inputs = mutableSetOf<SimResourceProvider>() + override val inputs: Set<SimResourceProvider> get() = _inputs /** - * The commands to submit to the underlying host. + * A flag to indicate that the switch was closed. */ - private val commands = mutableMapOf<R, SimResourceCommand>() + private var isClosed = false /** - * The active output contexts. + * The aggregator to aggregate the resources. */ - private val outputContexts: MutableList<OutputContext> = mutableListOf() + private val aggregator = SimResourceAggregatorMaxMin(clock) /** - * The remaining work of all inputs. + * The distributor to distribute the aggregated resources. */ - private val totalRemainingWork: Double - get() = inputConsumers.sumByDouble { it.remainingWork } - - /** - * The total speed requested by the vCPUs. - */ - private var totalRequestedSpeed = 0.0 - - /** - * The total amount of work requested by the vCPUs. - */ - private var totalRequestedWork = 0.0 - - /** - * The total allocated speed for the vCPUs. - */ - private var totalAllocatedSpeed = 0.0 - - /** - * The total allocated work requested for the vCPUs. - */ - private var totalAllocatedWork = 0.0 - - /** - * The amount of work that could not be performed due to over-committing resources. - */ - private var totalOvercommittedWork = 0.0 - - /** - * The amount of work that was lost due to interference. - */ - private var totalInterferedWork = 0.0 - - /** - * A flag to indicate that the scheduler has submitted work that has not yet been completed. - */ - private var isDirty: Boolean = false - - /** - * The scheduler barrier. - */ - private var barrier: SimConsumerBarrier = SimConsumerBarrier(0) - - /** - * A flag to indicate that the switch is closed. - */ - private var isClosed: Boolean = false + private val distributor = SimResourceDistributorMaxMin( + aggregator.output, clock, + object : SimResourceDistributorMaxMin.Listener { + override fun onSliceFinish( + switch: SimResourceDistributor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) + } + } + ) /** * Add an output to the switch represented by [resource]. */ - override fun addOutput(resource: R): SimResourceProvider<R> { + override fun addOutput(capacity: Double): SimResourceProvider { check(!isClosed) { "Switch has been closed" } - val provider = OutputProvider(resource) + val provider = distributor.addOutput(capacity) _outputs.add(provider) return provider } @@ -121,169 +85,29 @@ public class SimResourceSwitchMaxMin<R : SimResource>( /** * Add the specified [input] to the switch. */ - override fun addInput(input: SimResourceProvider<R>) { + override fun addInput(input: SimResourceProvider) { check(!isClosed) { "Switch has been closed" } - val consumer = InputConsumer(input) - _inputs.add(input) - inputConsumers += consumer + aggregator.addInput(input) } override fun close() { - isClosed = true - } - - /** - * Indicate that the workloads should be re-scheduled. - */ - private fun schedule() { - isDirty = true - interruptAll() - } - - /** - * Schedule the work over the physical CPUs. - */ - private fun doSchedule() { - // If there is no work yet, mark all inputs as idle. - if (outputContexts.isEmpty()) { - commands.replaceAll { _, _ -> SimResourceCommand.Idle() } - interruptAll() - } - - val maxUsage = inputs.sumByDouble { it.resource.capacity } - var duration: Double = Double.MAX_VALUE - var deadline: Long = Long.MAX_VALUE - var availableSpeed = maxUsage - var totalRequestedSpeed = 0.0 - var totalRequestedWork = 0.0 - - // Sort the outputs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set - outputContexts.sort() - - // Divide the available input capacity fairly across the outputs using max-min fair sharing - val outputIterator = outputContexts.listIterator() - var remaining = outputContexts.size - while (outputIterator.hasNext()) { - val output = outputIterator.next() - val availableShare = availableSpeed / remaining-- - - when (val command = output.activeCommand) { - is SimResourceCommand.Idle -> { - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, command.deadline) - - output.actualSpeed = 0.0 - } - is SimResourceCommand.Consume -> { - val grantedSpeed = min(output.allowedSpeed, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, command.deadline) - - // Ignore idle computation - if (grantedSpeed <= 0.0 || command.work <= 0.0) { - output.actualSpeed = 0.0 - continue - } - - totalRequestedSpeed += command.limit - totalRequestedWork += command.work - - output.actualSpeed = grantedSpeed - availableSpeed -= grantedSpeed - - // The duration that we want to run is that of the shortest request from an output - duration = min(duration, command.work / grantedSpeed) - } - SimResourceCommand.Exit -> { - // Apparently the output consumer has exited, so remove it from the scheduling queue. - outputIterator.remove() - } - } - } - - // Round the duration to milliseconds - duration = ceil(duration * 1000) / 1000 - - assert(deadline >= clock.millis()) { "Deadline already passed" } - - val totalAllocatedSpeed = maxUsage - availableSpeed - var totalAllocatedWork = 0.0 - availableSpeed = totalAllocatedSpeed - - // Divide the requests over the available capacity of the input resources fairly - for (input in inputs.sortedByDescending { it.resource.capacity }) { - val maxResourceUsage = input.resource.capacity - val fraction = maxResourceUsage / maxUsage - val grantedSpeed = min(maxResourceUsage, totalAllocatedSpeed * fraction) - val grantedWork = duration * grantedSpeed - - commands[input.resource] = - if (grantedWork > 0.0 && grantedSpeed > 0.0) - SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) - else - SimResourceCommand.Idle(deadline) - - totalAllocatedWork += grantedWork - availableSpeed -= grantedSpeed - } - - this.totalRequestedSpeed = totalRequestedSpeed - this.totalRequestedWork = totalRequestedWork - this.totalAllocatedSpeed = totalAllocatedSpeed - this.totalAllocatedWork = totalAllocatedWork - - interruptAll() - } - - /** - * Flush the progress of the vCPUs. - */ - private fun flushGuests() { - val totalRemainingWork = totalRemainingWork - - // Flush all the outputs work - for (output in outputContexts) { - output.flush(isIntermediate = true) - } - - // Report metrics - listener?.onSliceFinish( - this, - totalRequestedWork.toLong(), - (totalAllocatedWork - totalRemainingWork).toLong(), - totalOvercommittedWork.toLong(), - totalInterferedWork.toLong(), - totalRequestedSpeed, - totalAllocatedSpeed - ) - totalInterferedWork = 0.0 - totalOvercommittedWork = 0.0 - - // Force all inputs to re-schedule their work. - doSchedule() - } - - /** - * Interrupt all inputs. - */ - private fun interruptAll() { - for (input in inputConsumers) { - input.interrupt() + if (!isClosed) { + isClosed = true + distributor.close() + aggregator.close() } } /** * Event listener for hypervisor events. */ - public interface Listener<R : SimResource> { + public interface Listener { /** * This method is invoked when a slice is finished. */ public fun onSliceFinish( - switch: SimResourceSwitchMaxMin<R>, + switch: SimResourceSwitchMaxMin, requestedWork: Long, grantedWork: Long, overcommittedWork: Long, @@ -292,203 +116,4 @@ public class SimResourceSwitchMaxMin<R : SimResource>( cpuDemand: Double ) } - - /** - * An internal [SimResourceProvider] implementation for switch outputs. - */ - private inner class OutputProvider(override val resource: R) : SimResourceProvider<R> { - /** - * The [OutputContext] that is currently running. - */ - private var ctx: OutputContext? = null - - override var state: SimResourceState = SimResourceState.Pending - internal set - - override fun startConsumer(consumer: SimResourceConsumer<R>) { - check(state == SimResourceState.Pending) { "Resource cannot be consumed" } - - val ctx = OutputContext(this, resource, consumer) - this.ctx = ctx - this.state = SimResourceState.Active - outputContexts += ctx - - ctx.start() - schedule() - } - - override fun close() { - cancel() - - state = SimResourceState.Stopped - _outputs.remove(this) - } - - override fun interrupt() { - ctx?.interrupt() - } - - override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.stop() - } - - if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending - } - } - } - - /** - * A [SimAbstractResourceContext] for the output resources. - */ - private inner class OutputContext( - private val provider: OutputProvider, - resource: R, - consumer: SimResourceConsumer<R> - ) : SimAbstractResourceContext<R>(resource, clock, consumer), Comparable<OutputContext> { - /** - * The current command that is processed by the vCPU. - */ - var activeCommand: SimResourceCommand = SimResourceCommand.Idle() - - /** - * The processing speed that is allowed by the model constraints. - */ - var allowedSpeed: Double = 0.0 - - /** - * The actual processing speed. - */ - var actualSpeed: Double = 0.0 - - private fun reportOvercommit() { - totalOvercommittedWork += remainingWork - } - - override fun onIdle(deadline: Long) { - reportOvercommit() - - allowedSpeed = 0.0 - activeCommand = SimResourceCommand.Idle(deadline) - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - reportOvercommit() - - allowedSpeed = getSpeed(limit) - activeCommand = SimResourceCommand.Consume(work, limit, deadline) - } - - override fun onFinish(cause: Throwable?) { - reportOvercommit() - - activeCommand = SimResourceCommand.Exit - provider.cancel() - - super.onFinish(cause) - } - - override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { - // Apply performance interference model - val performanceScore = 1.0 - - // Compute the remaining amount of work - return if (work > 0.0) { - // Compute the fraction of compute time allocated to the VM - val fraction = actualSpeed / totalAllocatedSpeed - - // Compute the work that was actually granted to the VM. - val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction - val processed = processingAvailable * performanceScore - - val interferedWork = processingAvailable - processed - - totalInterferedWork += interferedWork - - max(0.0, work - processed) - } else { - 0.0 - } - } - - override fun interrupt() { - // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead - // to infinite recursion. - if (isProcessing) { - return - } - - super.interrupt() - - // Force the scheduler to re-schedule - schedule() - } - - override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) - } - - /** - * An internal [SimResourceConsumer] implementation for switch inputs. - */ - private inner class InputConsumer(val input: SimResourceProvider<R>) : SimResourceConsumer<R> { - /** - * The resource context of the consumer. - */ - private lateinit var ctx: SimResourceContext<R> - - /** - * The remaining work of this consumer. - */ - val remainingWork: Double - get() = ctx.remainingWork - - init { - barrier = SimConsumerBarrier(barrier.parties + 1) - input.startConsumer(this@InputConsumer) - } - - /** - * Interrupt the consumer - */ - fun interrupt() { - ctx.interrupt() - } - - override fun onStart(ctx: SimResourceContext<R>) { - this.ctx = ctx - } - - override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand { - val isLast = barrier.enter() - - // Flush the progress of the guest after the barrier has been reached. - if (isLast && isDirty) { - isDirty = false - flushGuests() - } - - return if (isDirty) { - // Wait for the scheduler determine the work after the barrier has been reached by all CPUs. - SimResourceCommand.Idle() - } else { - // Indicate that the scheduler needs to run next call. - if (isLast) { - isDirty = true - } - - commands[ctx.resource] ?: SimResourceCommand.Idle() - } - } - - override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) { - barrier = SimConsumerBarrier(barrier.parties - 1) - inputConsumers -= this@InputConsumer - _inputs -= input - - super.onFinish(ctx, cause) - } - } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt index 7aa5a5aa..52a42241 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt @@ -42,4 +42,11 @@ public class SimConsumerBarrier(public val parties: Int) { } return false } + + /** + * Reset the barrier. + */ + public fun reset() { + counter = 0 + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt index 0189fe4c..a52d1d5d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources.consumer -import org.opendc.simulator.resources.SimResource import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -31,15 +30,15 @@ import org.opendc.simulator.resources.SimResourceContext * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource * consumption for some period of time. */ -public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> { +public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer { private var iterator: Iterator<Fragment>? = null - override fun onStart(ctx: SimResourceContext<SimResource>) { + override fun onStart(ctx: SimResourceContext) { check(iterator == null) { "Consumer already running" } iterator = trace.iterator() } - override fun onNext(ctx: SimResourceContext<SimResource>): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { val iterator = checkNotNull(iterator) return if (iterator.hasNext()) { val now = ctx.clock.millis() @@ -58,7 +57,7 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour } } - override fun onFinish(ctx: SimResourceContext<SimResource>, cause: Throwable?) { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { iterator = null } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt index 62425583..8f24a020 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources.consumer -import org.opendc.simulator.resources.SimResource import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -30,10 +29,10 @@ import org.opendc.simulator.resources.SimResourceContext /** * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization. */ -public class SimWorkConsumer<R : SimResource>( +public class SimWorkConsumer( private val work: Double, private val utilization: Double -) : SimResourceConsumer<R> { +) : SimResourceConsumer { init { require(work >= 0.0) { "Work must be positive" } @@ -43,12 +42,12 @@ public class SimWorkConsumer<R : SimResource>( private var limit = 0.0 private var remainingWork: Double = 0.0 - override fun onStart(ctx: SimResourceContext<R>) { - limit = ctx.resource.capacity * utilization + override fun onStart(ctx: SimResourceContext) { + limit = ctx.capacity * utilization remainingWork = work } - override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { val work = this.remainingWork + ctx.remainingWork this.remainingWork -= work return if (work > 0.0) { |
