diff options
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-resources/src/main')
9 files changed, 946 insertions, 13 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index f9da74c7..52251bff 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -138,10 +138,11 @@ public abstract class SimAbstractResourceContext<R : SimResource>( } try { - val (timestamp, command) = activeCommand ?: return + val activeCommand = activeCommand ?: return + val (timestamp, command) = activeCommand isProcessing = true - activeCommand = null + this.activeCommand = null val duration = now - timestamp assert(duration >= 0) { "Flush in the past" } @@ -153,6 +154,8 @@ public abstract class SimAbstractResourceContext<R : SimResource>( // 2. The resource consumer should be interrupted (e.g., someone called .interrupt()) if (command.deadline <= now || !isIntermediate) { next(remainingWork = 0.0) + } else { + this.activeCommand = activeCommand } } is SimResourceCommand.Consume -> { diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt new file mode 100644 index 00000000..ca23557c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume + +/** + * A helper class to construct a [SimResourceProvider] which forwards the requests to a [SimResourceConsumer]. + */ +public class SimResourceForwarder<R : SimResource>(override val resource: R) : + SimResourceProvider<R>, SimResourceConsumer<R> { + /** + * The [SimResourceContext] in which the forwarder runs. + */ + private var ctx: SimResourceContext<R>? = null + + /** + * A flag to indicate that the forwarder is closed. + */ + private var isClosed: Boolean = false + + /** + * The continuation to resume after consumption. + */ + private var cont: Continuation<Unit>? = null + + /** + * The delegate [SimResourceConsumer]. + */ + private var delegate: SimResourceConsumer<R>? = null + + /** + * A flag to indicate that the delegate was started. + */ + private var hasDelegateStarted: Boolean = false + + /** + * The remaining amount of work last cycle. + */ + private var remainingWork: Double = 0.0 + + override suspend fun consume(consumer: SimResourceConsumer<R>) { + check(!isClosed) { "Lifecycle of forwarder has ended" } + check(cont == null) { "Run should not be called concurrently" } + + return suspendCancellableCoroutine { cont -> + this.cont = cont + this.delegate = consumer + + cont.invokeOnCancellation { reset() } + + ctx?.interrupt() + } + } + + override fun interrupt() { + ctx?.interrupt() + } + + override fun close() { + isClosed = true + interrupt() + ctx = null + } + + override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand { + this.ctx = ctx + + return onNext(ctx, 0.0) + } + + override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand { + this.remainingWork = remainingWork + + return if (isClosed) { + SimResourceCommand.Exit + } else if (!hasDelegateStarted) { + start() + } else { + next() + } + } + + /** + * Start the delegate. + */ + private fun start(): SimResourceCommand { + val delegate = delegate ?: return SimResourceCommand.Idle() + val command = delegate.onStart(checkNotNull(ctx)) + + hasDelegateStarted = true + + return forward(command) + } + + /** + * Obtain the next command to process. + */ + private fun next(): SimResourceCommand { + val delegate = delegate + return forward(delegate?.onNext(checkNotNull(ctx), remainingWork) ?: SimResourceCommand.Idle()) + } + + /** + * Forward the specified [command]. + */ + private fun forward(command: SimResourceCommand): SimResourceCommand { + return if (command == SimResourceCommand.Exit) { + val cont = checkNotNull(cont) + + // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we + // reset beforehand the existing state and check whether it has been updated afterwards + reset() + cont.resume(Unit) + + if (isClosed) + SimResourceCommand.Exit + else + start() + } else { + command + } + } + + /** + * Reset the delegate. + */ + private fun reset() { + cont = null + delegate = null + hasDelegateStarted = false + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index 91a745ab..e35aa683 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -37,6 +37,11 @@ public interface SimResourceProvider<out R : SimResource> : AutoCloseable { public suspend fun consume(consumer: SimResourceConsumer<R>) /** + * Interrupt the resource. + */ + public fun interrupt() + + /** * End the lifetime of the resource. * * This operation terminates the existing resource consumer. diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 4445df86..540a17c9 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -50,14 +50,32 @@ public class SimResourceSource<R : SimResource>( get() = _speed private val _speed = MutableStateFlow(0.0) + /** + * A flag to indicate that the resource was closed. + */ + private var isClosed: Boolean = false + + /** + * The current active consumer. + */ + private var cont: CancellableContinuation<Unit>? = null + + /** + * The [Context] that is currently running. + */ + private var ctx: Context? = null + override suspend fun consume(consumer: SimResourceConsumer<R>) { check(!isClosed) { "Lifetime of resource has ended." } check(cont == null) { "Run should not be called concurrently" } try { return suspendCancellableCoroutine { cont -> - this.cont = cont val ctx = Context(consumer, cont) + + this.cont = cont + this.ctx = ctx + ctx.start() cont.invokeOnCancellation { ctx.stop() @@ -65,6 +83,7 @@ public class SimResourceSource<R : SimResource>( } } finally { cont = null + ctx = null } } @@ -72,17 +91,12 @@ public class SimResourceSource<R : SimResource>( isClosed = true cont?.cancel() cont = null + ctx = null } - /** - * A flag to indicate that the resource was closed. - */ - private var isClosed: Boolean = false - - /** - * The current active consumer. - */ - private var cont: CancellableContinuation<Unit>? = null + override fun interrupt() { + ctx?.interrupt() + } /** * Internal implementation of [SimResourceContext] for this class. @@ -113,7 +127,7 @@ public class SimResourceSource<R : SimResource>( speed = getSpeed(limit) val until = min(deadline, clock.millis() + getDuration(work, speed)) - scheduler.startSingleTimerTo(this, until) { flush() } + scheduler.startSingleTimerTo(this, until, ::flush) } override fun onFinish() { diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt new file mode 100644 index 00000000..cd1af3fc --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceSwitch] enables switching of capacity of multiple resources of type [R] between multiple consumers. + */ +public interface SimResourceSwitch<R : SimResource> : AutoCloseable { + /** + * The output resource providers to which resource consumers can be attached. + */ + public val outputs: Set<SimResourceProvider<R>> + + /** + * The input resources that will be switched between the output providers. + */ + public val inputs: Set<SimResourceProvider<R>> + + /** + * Add an output to the switch represented by [resource]. + */ + public fun addOutput(resource: R): SimResourceProvider<R> + + /** + * Add the specified [input] to the switch. + */ + public fun addInput(input: SimResourceProvider<R>) +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt new file mode 100644 index 00000000..060d0ea2 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch +import java.util.ArrayDeque +import kotlin.coroutines.CoroutineContext + +/** + * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that + * a single output is directly connected to an input and that the switch can only support as much outputs as inputs. + */ +public class SimResourceSwitchExclusive<R : SimResource>(context: CoroutineContext) : SimResourceSwitch<R> { + /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + private val scope = CoroutineScope(context + Job()) + + private val _outputs = mutableSetOf<SimResourceProvider<R>>() + override val outputs: Set<SimResourceProvider<R>> + get() = _outputs + + private val availableResources = ArrayDeque<SimResourceForwarder<R>>() + private val _inputs = mutableSetOf<SimResourceProvider<R>>() + override val inputs: Set<SimResourceProvider<R>> + get() = _inputs + + override fun addOutput(resource: R): SimResourceProvider<R> { + check(availableResources.isNotEmpty()) { "No capacity to serve request" } + val forwarder = availableResources.poll() + val output = Provider(resource, forwarder) + _outputs += output + return output + } + + override fun addInput(input: SimResourceProvider<R>) { + if (input in inputs) { + return + } + + val forwarder = SimResourceForwarder(input.resource) + + scope.launch { input.consume(forwarder) } + + _inputs += input + availableResources += forwarder + } + + override fun close() { + scope.cancel() + } + + private inner class Provider( + override val resource: R, + private val forwarder: SimResourceForwarder<R> + ) : SimResourceProvider<R> { + + override suspend fun consume(consumer: SimResourceConsumer<R>) = forwarder.consume(consumer) + + override fun interrupt() { + forwarder.interrupt() + } + + override fun close() { + _outputs -= this + availableResources += forwarder + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt new file mode 100644 index 00000000..bcf76d3c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -0,0 +1,508 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.* +import org.opendc.simulator.resources.consumer.SimConsumerBarrier +import java.time.Clock +import kotlin.coroutines.Continuation +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min + * fair sharing. + */ +public class SimResourceSwitchMaxMin<R : SimResource>( + private val clock: Clock, + context: CoroutineContext, + private val listener: Listener<R>? = null +) : SimResourceSwitch<R> { + /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + private val scope = CoroutineScope(context + Job()) + + private val inputConsumers = mutableSetOf<InputConsumer>() + private val _outputs = mutableSetOf<OutputProvider>() + override val outputs: Set<SimResourceProvider<R>> + get() = _outputs + + private val _inputs = mutableSetOf<SimResourceProvider<R>>() + override val inputs: Set<SimResourceProvider<R>> + get() = _inputs + + /** + * The commands to submit to the underlying host. + */ + private val commands = mutableMapOf<R, SimResourceCommand>() + + /** + * The active output contexts. + */ + private val outputContexts: MutableList<OutputContext> = mutableListOf() + + /** + * The total amount of remaining work (of all pCPUs). + */ + private var totalRemainingWork: Double = 0.0 + + /** + * The total speed requested by the vCPUs. + */ + private var totalRequestedSpeed = 0.0 + + /** + * The total amount of work requested by the vCPUs. + */ + private var totalRequestedWork = 0.0 + + /** + * The total allocated speed for the vCPUs. + */ + private var totalAllocatedSpeed = 0.0 + + /** + * The total allocated work requested for the vCPUs. + */ + private var totalAllocatedWork = 0.0 + + /** + * The amount of work that could not be performed due to over-committing resources. + */ + private var totalOvercommittedWork = 0.0 + + /** + * The amount of work that was lost due to interference. + */ + private var totalInterferedWork = 0.0 + + /** + * A flag to indicate that the scheduler has submitted work that has not yet been completed. + */ + private var isDirty: Boolean = false + + /** + * The scheduler barrier. + */ + private var barrier: SimConsumerBarrier = SimConsumerBarrier(0) + + /** + * Add an output to the switch represented by [resource]. + */ + override fun addOutput(resource: R): SimResourceProvider<R> { + val provider = OutputProvider(resource) + _outputs.add(provider) + return provider + } + + /** + * Add the specified [input] to the switch. + */ + override fun addInput(input: SimResourceProvider<R>) { + val consumer = InputConsumer(input) + _inputs.add(input) + inputConsumers += consumer + } + + override fun close() { + scope.cancel() + } + + /** + * Indicate that the workloads should be re-scheduled. + */ + private fun schedule() { + isDirty = true + interruptAll() + } + + /** + * Schedule the work over the physical CPUs. + */ + private fun doSchedule() { + // If there is no work yet, mark all inputs as idle. + if (outputContexts.isEmpty()) { + commands.replaceAll { _, _ -> SimResourceCommand.Idle() } + interruptAll() + } + + val maxUsage = inputs.sumByDouble { it.resource.capacity } + var duration: Double = Double.MAX_VALUE + var deadline: Long = Long.MAX_VALUE + var availableSpeed = maxUsage + var totalRequestedSpeed = 0.0 + var totalRequestedWork = 0.0 + + // Sort the outputs based on their requested usage + // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + outputContexts.sort() + + // Divide the available input capacity fairly across the outputs using max-min fair sharing + val outputIterator = outputContexts.listIterator() + var remaining = outputContexts.size + while (outputIterator.hasNext()) { + val output = outputIterator.next() + val availableShare = availableSpeed / remaining-- + + when (val command = output.activeCommand) { + is SimResourceCommand.Idle -> { + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + output.actualSpeed = 0.0 + } + is SimResourceCommand.Consume -> { + val grantedSpeed = min(output.allowedSpeed, availableShare) + + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + // Ignore idle computation + if (grantedSpeed <= 0.0 || command.work <= 0.0) { + output.actualSpeed = 0.0 + continue + } + + totalRequestedSpeed += command.limit + totalRequestedWork += command.work + + output.actualSpeed = grantedSpeed + availableSpeed -= grantedSpeed + + // The duration that we want to run is that of the shortest request from an output + duration = min(duration, command.work / grantedSpeed) + } + SimResourceCommand.Exit -> { + // Apparently the output consumer has exited, so remove it from the scheduling queue. + outputIterator.remove() + } + } + } + + // Round the duration to milliseconds + duration = ceil(duration * 1000) / 1000 + + assert(deadline >= clock.millis()) { "Deadline already passed" } + + val totalAllocatedSpeed = maxUsage - availableSpeed + var totalAllocatedWork = 0.0 + availableSpeed = totalAllocatedSpeed + + // Divide the requests over the available capacity of the input resources fairly + for (input in inputs.sortedByDescending { it.resource.capacity }) { + val maxResourceUsage = input.resource.capacity + val fraction = maxResourceUsage / maxUsage + val grantedSpeed = min(maxResourceUsage, totalAllocatedSpeed * fraction) + val grantedWork = duration * grantedSpeed + + commands[input.resource] = + if (grantedWork > 0.0 && grantedSpeed > 0.0) + SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + + totalAllocatedWork += grantedWork + availableSpeed -= grantedSpeed + } + + this.totalRequestedSpeed = totalRequestedSpeed + this.totalRequestedWork = totalRequestedWork + this.totalAllocatedSpeed = totalAllocatedSpeed + this.totalAllocatedWork = totalAllocatedWork + + interruptAll() + } + + /** + * Flush the progress of the vCPUs. + */ + private fun flushGuests() { + // Flush all the outputs work + for (output in outputContexts) { + output.flush(isIntermediate = true) + } + + // Report metrics + listener?.onSliceFinish( + this, + totalRequestedWork.toLong(), + (totalAllocatedWork - totalRemainingWork).toLong(), + totalOvercommittedWork.toLong(), + totalInterferedWork.toLong(), + totalRequestedSpeed, + totalAllocatedSpeed + ) + totalRemainingWork = 0.0 + totalInterferedWork = 0.0 + totalOvercommittedWork = 0.0 + + // Force all inputs to re-schedule their work. + doSchedule() + } + + /** + * Interrupt all inputs. + */ + private fun interruptAll() { + for (input in inputConsumers) { + input.interrupt() + } + } + + /** + * Event listener for hypervisor events. + */ + public interface Listener<R : SimResource> { + /** + * This method is invoked when a slice is finished. + */ + public fun onSliceFinish( + switch: SimResourceSwitchMaxMin<R>, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) + } + + /** + * An internal [SimResourceProvider] implementation for switch outputs. + */ + private inner class OutputProvider(override val resource: R) : SimResourceProvider<R> { + /** + * A flag to indicate that the resource was closed. + */ + private var isClosed: Boolean = false + + /** + * The current active consumer. + */ + private var cont: CancellableContinuation<Unit>? = null + + /** + * The [OutputContext] that is currently running. + */ + private var ctx: OutputContext? = null + + override suspend fun consume(consumer: SimResourceConsumer<R>) { + check(!isClosed) { "Lifetime of resource has ended." } + check(cont == null) { "Run should not be called concurrently" } + + try { + return suspendCancellableCoroutine { cont -> + val ctx = OutputContext(resource, consumer, cont) + ctx.start() + cont.invokeOnCancellation { + ctx.stop() + } + + this.cont = cont + this.ctx = ctx + + outputContexts += ctx + schedule() + } + } finally { + cont = null + ctx = null + } + } + + override fun close() { + isClosed = true + cont?.cancel() + cont = null + ctx = null + _outputs.remove(this) + } + + override fun interrupt() { + ctx?.interrupt() + } + } + + /** + * A [SimAbstractResourceContext] for the output resources. + */ + private inner class OutputContext( + resource: R, + consumer: SimResourceConsumer<R>, + private val cont: Continuation<Unit> + ) : SimAbstractResourceContext<R>(resource, clock, consumer), Comparable<OutputContext> { + /** + * The current command that is processed by the vCPU. + */ + var activeCommand: SimResourceCommand = SimResourceCommand.Idle() + + /** + * The processing speed that is allowed by the model constraints. + */ + var allowedSpeed: Double = 0.0 + + /** + * The actual processing speed. + */ + var actualSpeed: Double = 0.0 + + /** + * A flag to indicate that the CPU has exited. + */ + var hasExited: Boolean = false + + override fun onIdle(deadline: Long) { + allowedSpeed = 0.0 + activeCommand = SimResourceCommand.Idle(deadline) + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + allowedSpeed = getSpeed(limit) + activeCommand = SimResourceCommand.Consume(work, limit, deadline) + } + + override fun onFinish() { + hasExited = true + activeCommand = SimResourceCommand.Exit + cont.resume(Unit) + } + + override fun onFailure(cause: Throwable) { + hasExited = true + activeCommand = SimResourceCommand.Exit + cont.resumeWithException(cause) + } + + override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { + // Apply performance interference model + val performanceScore = 1.0 + + // Compute the remaining amount of work + val remainingWork = if (work > 0.0) { + // Compute the fraction of compute time allocated to the VM + val fraction = actualSpeed / totalAllocatedSpeed + + // Compute the work that was actually granted to the VM. + val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction + val processed = processingAvailable * performanceScore + + val interferedWork = processingAvailable - processed + + totalInterferedWork += interferedWork + + max(0.0, work - processed) + } else { + 0.0 + } + + if (!isInterrupted) { + totalOvercommittedWork += remainingWork + } + + return remainingWork + } + + override fun interrupt() { + // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead + // to infinite recursion. + if (isProcessing) { + return + } + + super.interrupt() + + // Force the scheduler to re-schedule + schedule() + } + + override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) + } + + /** + * An internal [SimResourceConsumer] implementation for switch inputs. + */ + private inner class InputConsumer(val input: SimResourceProvider<R>) : SimResourceConsumer<R> { + /** + * The resource context of the consumer. + */ + private lateinit var ctx: SimResourceContext<R> + + init { + scope.launch { + try { + barrier = SimConsumerBarrier(barrier.parties + 1) + input.consume(this@InputConsumer) + } catch (e: CancellationException) { + // Cancel gracefully + throw e + } catch (e: Throwable) { + e.printStackTrace() + } finally { + barrier = SimConsumerBarrier(barrier.parties - 1) + inputConsumers -= this@InputConsumer + _inputs -= input + } + } + } + + /** + * Interrupt the consumer + */ + fun interrupt() { + ctx.interrupt() + } + + override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand { + this.ctx = ctx + return commands[ctx.resource] ?: SimResourceCommand.Idle() + } + + override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand { + totalRemainingWork += remainingWork + val isLast = barrier.enter() + + // Flush the progress of the guest after the barrier has been reached. + if (isLast && isDirty) { + isDirty = false + flushGuests() + } + + return if (isDirty) { + // Wait for the scheduler determine the work after the barrier has been reached by all CPUs. + SimResourceCommand.Idle() + } else { + // Indicate that the scheduler needs to run next call. + if (isLast) { + isDirty = true + } + + commands[ctx.resource] ?: SimResourceCommand.Idle() + } + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt new file mode 100644 index 00000000..7aa5a5aa --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.consumer + +/** + * The [SimConsumerBarrier] is a barrier that allows consumers to wait for a select number of other consumers to + * complete, before proceeding its operation. + */ +public class SimConsumerBarrier(public val parties: Int) { + private var counter = 0 + + /** + * Enter the barrier and determine whether the caller is the last to reach the barrier. + * + * @return `true` if the caller is the last to reach the barrier, `false` otherwise. + */ + public fun enter(): Boolean { + val last = ++counter == parties + if (last) { + counter = 0 + return true + } + return false + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt new file mode 100644 index 00000000..03a3cebd --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.consumer + +import org.opendc.simulator.resources.SimResource +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext + +/** + * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource + * consumption for some period of time. + */ +public class SimTraceConsumer(trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> { + private val iterator = trace.iterator() + + override fun onStart(ctx: SimResourceContext<SimResource>): SimResourceCommand { + return onNext(ctx, 0.0) + } + + override fun onNext(ctx: SimResourceContext<SimResource>, remainingWork: Double): SimResourceCommand { + return if (iterator.hasNext()) { + val now = ctx.clock.millis() + val fragment = iterator.next() + val work = (fragment.duration / 1000) * fragment.usage + val deadline = now + fragment.duration + + assert(deadline >= now) { "Deadline already passed" } + + if (work > 0.0) + SimResourceCommand.Consume(work, fragment.usage, deadline) + else + SimResourceCommand.Idle(deadline) + } else { + SimResourceCommand.Exit + } + } + + /** + * A fragment of the workload. + */ + public data class Fragment(val duration: Long, val usage: Double) +} |
