diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources/src/main')
9 files changed, 238 insertions, 42 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 1bcaf45f..6ae04f27 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -22,12 +22,10 @@ package org.opendc.simulator.resources -import java.time.Clock - /** * Abstract implementation of [SimResourceAggregator]. */ -public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator { +public abstract class SimAbstractResourceAggregator(private val scheduler: SimResourceScheduler) : SimResourceAggregator { /** * This method is invoked when the resource consumer consumes resources. */ @@ -77,7 +75,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : protected val outputContext: SimResourceContext get() = context - private val context = object : SimAbstractResourceContext(0.0, clock, _output) { + private val context = object : SimAbstractResourceContext(0.0, scheduler, _output) { override val remainingWork: Double get() { val now = clock.millis() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index d2f585b1..c03bfad5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -31,9 +31,16 @@ import kotlin.math.min */ public abstract class SimAbstractResourceContext( initialCapacity: Double, - override val clock: Clock, + private val scheduler: SimResourceScheduler, private val consumer: SimResourceConsumer -) : SimResourceContext { +) : SimResourceContext, SimResourceFlushable { + + /** + * The clock of the context. + */ + public override val clock: Clock + get() = scheduler.clock + /** * The capacity of the resource. */ @@ -143,21 +150,12 @@ public abstract class SimAbstractResourceContext( flush(isIntermediate = true) doStop() - } catch (cause: Throwable) { - doFail(cause) } finally { isProcessing = false } } - /** - * Flush the current active resource consumption. - * - * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be - * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer - * will be asked to deliver a new command and is essentially interrupted. - */ - public fun flush(isIntermediate: Boolean = false) { + override fun flush(isIntermediate: Boolean) { // Flush is no-op when the consumer is finished or not yet started if (state != SimResourceState.Active) { return @@ -226,7 +224,7 @@ public abstract class SimAbstractResourceContext( return } - flush() + scheduler.schedule(this, isIntermediate = false) } override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]" @@ -234,7 +232,7 @@ public abstract class SimAbstractResourceContext( /** * A flag to indicate that the resource is currently processing a command. */ - protected var isProcessing: Boolean = false + private var isProcessing: Boolean = false /** * The current command that is being processed. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index 5d550ad8..5665abd1 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -22,12 +22,10 @@ package org.opendc.simulator.resources -import java.time.Clock - /** * A [SimResourceAggregator] that distributes the load equally across the input resources. */ -public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) { +public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimAbstractResourceAggregator(scheduler) { private val consumers = mutableListOf<Input>() override fun doConsume(work: Double, limit: Double, deadline: Long) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index 8128c98b..a76cb1e3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources -import java.time.Clock import kotlin.math.max import kotlin.math.min @@ -31,7 +30,7 @@ import kotlin.math.min */ public class SimResourceDistributorMaxMin( override val input: SimResourceProvider, - private val clock: Clock, + private val scheduler: SimResourceScheduler, private val listener: Listener? = null ) : SimResourceDistributor { override val outputs: Set<SimResourceProvider> @@ -220,7 +219,7 @@ public class SimResourceDistributorMaxMin( } } - assert(deadline >= clock.millis()) { "Deadline already passed" } + assert(deadline >= scheduler.clock.millis()) { "Deadline already passed" } this.totalRequestedSpeed = totalRequestedSpeed this.totalRequestedWork = totalRequestedWork @@ -337,7 +336,7 @@ public class SimResourceDistributorMaxMin( private inner class OutputContext( private val provider: OutputProvider, consumer: SimResourceConsumer - ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> { + ) : SimAbstractResourceContext(provider.capacity, scheduler, consumer), Comparable<OutputContext> { /** * The current command that is processed by the vCPU. */ @@ -402,6 +401,8 @@ public class SimResourceDistributorMaxMin( } } + private var isProcessing: Boolean = false + override fun interrupt() { // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead // to infinite recursion. @@ -409,10 +410,16 @@ public class SimResourceDistributorMaxMin( return } - super.interrupt() + try { + isProcessing = false - // Force the scheduler to re-schedule - schedule() + super.interrupt() + + // Force the scheduler to re-schedule + schedule() + } finally { + isProcessing = true + } } override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt new file mode 100644 index 00000000..f6a1a42e --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * An interface used by the [SimResourceScheduler] to flush the progress of resource consumer. + */ +public interface SimResourceFlushable { + /** + * Flush the current active resource consumption. + * + * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be + * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer + * will be asked to deliver a new command and is essentially interrupted. + */ + public fun flush(isIntermediate: Boolean) +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt new file mode 100644 index 00000000..a228c47b --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * A resource scheduler is responsible for scheduling the communication and synchronization between multiple resource + * providers and consumers. + * + * By centralizing the scheduling logic, updates of resources within a single system can be scheduled and tracked more + * efficiently, reducing the overall work needed per update. + */ +public interface SimResourceScheduler { + /** + * The [Clock] associated with this scheduler. + */ + public val clock: Clock + + /** + * Schedule a direct interrupt for the resource context represented by [flushable]. + * + * @param flushable The resource context that needs to be flushed. + * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be + * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer + * will be asked to deliver a new command and is essentially interrupted. + */ + public fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean = false) + + /** + * Schedule an interrupt in the future for the resource context represented by [flushable]. + * + * This method will override earlier calls to this method for the same [flushable]. + * + * @param flushable The resource context that needs to be flushed. + * @param timestamp The timestamp when the interrupt should happen. + * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be + * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer + * will be asked to deliver a new command and is essentially interrupted. + */ + public fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean = false) + + /** + * Batch the execution of several interrupts into a single call. + * + * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. + */ + public fun batch(block: () -> Unit) +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt new file mode 100644 index 00000000..cdbb4a6c --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.opendc.utils.TimerScheduler +import java.time.Clock +import java.util.ArrayDeque +import kotlin.coroutines.CoroutineContext + +/** + * A [SimResourceScheduler] queues all interrupts that occur during execution to be executed after. + * + * @param clock The virtual simulation clock. + */ +public class SimResourceSchedulerTrampoline(context: CoroutineContext, override val clock: Clock) : SimResourceScheduler { + /** + * The [TimerScheduler] to actually schedule the interrupts. + */ + private val timers = TimerScheduler<Any>(context, clock) + + /** + * A flag to indicate that an interrupt is currently running already. + */ + private var isRunning: Boolean = false + + /** + * The queue of resources to be flushed. + */ + private val queue = ArrayDeque<Pair<SimResourceFlushable, Boolean>>() + + override fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean) { + queue.add(flushable to isIntermediate) + + if (isRunning) { + return + } + + flush() + } + + override fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean) { + timers.startSingleTimerTo(flushable, timestamp) { + schedule(flushable, isIntermediate) + } + } + + override fun batch(block: () -> Unit) { + val wasAlreadyRunning = isRunning + try { + isRunning = true + block() + } finally { + if (!wasAlreadyRunning) { + isRunning = false + } + } + } + + /** + * Flush the scheduled queue. + */ + private fun flush() { + val visited = mutableSetOf<SimResourceFlushable>() + try { + isRunning = true + while (queue.isNotEmpty()) { + val (flushable, isIntermediate) = queue.poll() + flushable.flush(isIntermediate) + visited.add(flushable) + } + } finally { + isRunning = false + } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index fe569096..3277b889 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources -import org.opendc.utils.TimerScheduler -import java.time.Clock import kotlin.math.ceil import kotlin.math.min @@ -31,13 +29,11 @@ import kotlin.math.min * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity. * * @param initialCapacity The initial capacity of the resource. - * @param clock The virtual clock to track simulation time. * @param scheduler The scheduler to schedule the interrupts. */ public class SimResourceSource( initialCapacity: Double, - private val clock: Clock, - private val scheduler: TimerScheduler<Any> + private val scheduler: SimResourceScheduler ) : SimResourceProvider { /** * The current processing speed of the resource. @@ -96,22 +92,21 @@ public class SimResourceSource( /** * Internal implementation of [SimResourceContext] for this class. */ - private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) { + private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, scheduler, consumer) { override fun onIdle(deadline: Long) { // Do not resume if deadline is "infinite" if (deadline != Long.MAX_VALUE) { - scheduler.startSingleTimerTo(this, deadline) { flush() } + scheduler.schedule(this, deadline) } } override fun onConsume(work: Double, limit: Double, deadline: Long) { val until = min(deadline, clock.millis() + getDuration(work, speed)) - - scheduler.startSingleTimerTo(this, until, ::flush) + scheduler.schedule(this, until) } override fun onFinish() { - scheduler.cancel(this) + cancel() ctx = null diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index c796c251..5dc1e68d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -23,14 +23,13 @@ package org.opendc.simulator.resources import kotlinx.coroutines.* -import java.time.Clock /** * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min * fair sharing. */ public class SimResourceSwitchMaxMin( - clock: Clock, + scheduler: SimResourceScheduler, private val listener: Listener? = null ) : SimResourceSwitch { private val _outputs = mutableSetOf<SimResourceProvider>() @@ -49,13 +48,13 @@ public class SimResourceSwitchMaxMin( /** * The aggregator to aggregate the resources. */ - private val aggregator = SimResourceAggregatorMaxMin(clock) + private val aggregator = SimResourceAggregatorMaxMin(scheduler) /** * The distributor to distribute the aggregated resources. */ private val distributor = SimResourceDistributorMaxMin( - aggregator.output, clock, + aggregator.output, scheduler, object : SimResourceDistributorMaxMin.Listener { override fun onSliceFinish( switch: SimResourceDistributor, |
