diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-12-30 14:03:12 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-16 12:06:40 +0100 |
| commit | 6a2a5423479696e8dc28885be27cc3e3252f28b0 (patch) | |
| tree | e23dd1d7ab3a15969da5f7e02baf24a9434b9912 /simulator/opendc-simulator/opendc-simulator-resources/src/main | |
| parent | df2f52780c08c5d108741d3746eaf03222c64841 (diff) | |
simulator: Add generic framework for resource consumption modeling
This change adds a generic framework for modeling resource consumptions and
adapts opendc-simulator-compute to model machines and VMs on top of
this framework.
This framework anticipates the addition of additional resource types
such as memory, disk and network to the OpenDC codebase.
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-resources/src/main')
7 files changed, 609 insertions, 0 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt new file mode 100644 index 00000000..f9da74c7 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -0,0 +1,255 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. + */ +public abstract class SimAbstractResourceContext<R : SimResource>( + override val resource: R, + override val clock: Clock, + private val consumer: SimResourceConsumer<R> +) : SimResourceContext<R> { + /** + * This method is invoked when the resource will idle until the specified [deadline]. + */ + public abstract fun onIdle(deadline: Long) + + /** + * This method is invoked when the resource will be consumed until the specified [work] was processed or the + * [deadline] was reached. + */ + public abstract fun onConsume(work: Double, limit: Double, deadline: Long) + + /** + * This method is invoked when the resource consumer has finished. + */ + public abstract fun onFinish() + + /** + * This method is invoked when the resource consumer throws an exception. + */ + public abstract fun onFailure(cause: Throwable) + + /** + * Compute the duration that a resource consumption will take with the specified [speed]. + */ + protected open fun getDuration(work: Double, speed: Double): Long { + return ceil(work / speed * 1000).toLong() + } + + /** + * Compute the speed at which the resource may be consumed. + */ + protected open fun getSpeed(limit: Double): Double { + return min(limit, resource.capacity) + } + + /** + * Get the remaining work to process after a resource consumption was flushed. + * + * @param work The size of the resource consumption. + * @param speed The speed of consumption. + * @param duration The duration from the start of the consumption until now. + * @param isInterrupted A flag to indicate that the resource consumption could not be fully processed due to + * it being interrupted before it could finish or reach its deadline. + * @return The amount of work remaining. + */ + protected open fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { + return if (duration > 0L) { + val processed = duration / 1000.0 * speed + max(0.0, work - processed) + } else { + 0.0 + } + } + + /** + * Start the consumer. + */ + public fun start() { + try { + isProcessing = true + latestFlush = clock.millis() + + interpret(consumer.onStart(this)) + } catch (e: Throwable) { + onFailure(e) + } finally { + isProcessing = false + } + } + + /** + * Immediately stop the consumer. + */ + public fun stop() { + try { + isProcessing = true + latestFlush = clock.millis() + + flush(isIntermediate = true) + onFinish() + } catch (e: Throwable) { + onFailure(e) + } finally { + isProcessing = false + } + } + + /** + * Flush the current active resource consumption. + * + * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be + * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer + * will be asked to deliver a new command and is essentially interrupted. + */ + public open fun flush(isIntermediate: Boolean = false) { + val now = clock.millis() + + // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it. + if (isIntermediate && latestFlush >= now) { + return + } + + try { + val (timestamp, command) = activeCommand ?: return + + isProcessing = true + activeCommand = null + + val duration = now - timestamp + assert(duration >= 0) { "Flush in the past" } + + when (command) { + is SimResourceCommand.Idle -> { + // We should only continue processing the next command if: + // 1. The resource consumer reached its deadline. + // 2. The resource consumer should be interrupted (e.g., someone called .interrupt()) + if (command.deadline <= now || !isIntermediate) { + next(remainingWork = 0.0) + } + } + is SimResourceCommand.Consume -> { + val speed = min(resource.capacity, command.limit) + val isInterrupted = !isIntermediate && duration < getDuration(command.work, speed) + val remainingWork = getRemainingWork(command.work, speed, duration, isInterrupted) + + // We should only continue processing the next command if: + // 1. The resource consumption was finished. + // 2. The resource consumer reached its deadline. + // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) + if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { + next(remainingWork) + } else { + interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline)) + } + } + SimResourceCommand.Exit -> + // Flush may not be called when the resource consumer has finished + throw IllegalStateException() + } + } catch (e: Throwable) { + onFailure(e) + } finally { + latestFlush = now + isProcessing = false + } + } + + override fun interrupt() { + // Prevent users from interrupting the resource while they are constructing their next command, as this will + // only lead to infinite recursion. + if (isProcessing) { + return + } + + flush() + } + + override fun toString(): String = "SimAbstractResourceContext[resource=$resource]" + + /** + * A flag to indicate that the resource is currently processing a command. + */ + protected var isProcessing: Boolean = false + + /** + * The current command that is being processed. + */ + private var activeCommand: CommandWrapper? = null + + /** + * The latest timestamp at which the resource was flushed. + */ + private var latestFlush: Long = Long.MIN_VALUE + + /** + * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. + */ + private fun interpret(command: SimResourceCommand) { + val now = clock.millis() + + when (command) { + is SimResourceCommand.Idle -> { + val deadline = command.deadline + + require(deadline >= now) { "Deadline already passed" } + + onIdle(deadline) + } + is SimResourceCommand.Consume -> { + val work = command.work + val limit = command.limit + val deadline = command.deadline + + require(deadline >= now) { "Deadline already passed" } + + onConsume(work, limit, deadline) + } + is SimResourceCommand.Exit -> { + onFinish() + } + } + + assert(activeCommand == null) { "Concurrent access to current command" } + activeCommand = CommandWrapper(now, command) + } + + /** + * Request the workload for more work. + */ + private fun next(remainingWork: Double) { + interpret(consumer.onNext(this, remainingWork)) + } + + /** + * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. + */ + private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand) +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt new file mode 100644 index 00000000..31b0a175 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A generic representation of resource that may be consumed. + */ +public interface SimResource { + /** + * The capacity of the resource. + */ + public val capacity: Double +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt new file mode 100644 index 00000000..77c0a7a9 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A SimResourceCommand communicates to a [SimResource] how it is consumed by a [SimResourceConsumer]. + */ +public sealed class SimResourceCommand { + /** + * A request to the resource to perform the specified amount of work before the given [deadline]. + * + * @param work The amount of work to process on the CPU. + * @param limit The maximum amount of work to be processed per second. + * @param deadline The instant at which the work needs to be fulfilled. + */ + public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() { + init { + require(work > 0) { "Amount of work must be positive" } + require(limit > 0) { "Limit must be positive" } + } + } + + /** + * An indication to the resource that the consumer will idle until the specified [deadline] or if it is interrupted. + */ + public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() + + /** + * An indication to the resource that the consumer has finished. + */ + public object Exit : SimResourceCommand() +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt new file mode 100644 index 00000000..f516faa6 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A SimResourceConsumer characterizes how a [SimResource] is consumed. + */ +public interface SimResourceConsumer<in R : SimResource> { + /** + * This method is invoked when the consumer is started for a resource. + * + * @param ctx The execution context in which the consumer runs. + * @return The next command that the resource should perform. + */ + public fun onStart(ctx: SimResourceContext<R>): SimResourceCommand + + /** + * This method is invoked when a resource was either interrupted or reached its deadline. + * + * @param ctx The execution context in which the consumer runs. + * @param remainingWork The remaining work that was not yet completed. + * @return The next command that the resource should perform. + */ + public fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt new file mode 100644 index 00000000..dfb5e9ce --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a + * resource and a resource consumer. + */ +public interface SimResourceContext<out R : SimResource> { + /** + * The resource that is managed by this context. + */ + public val resource: R + + /** + * The virtual clock tracking simulation time. + */ + public val clock: Clock + + /** + * Ask the resource provider to interrupt its resource. + */ + public fun interrupt() +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt new file mode 100644 index 00000000..91a745ab --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceProvider] provides some resource of type [R]. + */ +public interface SimResourceProvider<out R : SimResource> : AutoCloseable { + /** + * The resource that is managed by this provider. + */ + public val resource: R + + /** + * Consume the resource provided by this provider using the specified [consumer]. + */ + public suspend fun consume(consumer: SimResourceConsumer<R>) + + /** + * End the lifetime of the resource. + * + * This operation terminates the existing resource consumer. + */ + public override fun close() +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt new file mode 100644 index 00000000..4445df86 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import org.opendc.utils.TimerScheduler +import java.time.Clock +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.math.min + +/** + * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity. + * + * @param resource The resource to provide. + * @param clock The virtual clock to track simulation time. + */ +public class SimResourceSource<R : SimResource>( + override val resource: R, + private val clock: Clock, + private val scheduler: TimerScheduler<Any> +) : SimResourceProvider<R> { + /** + * The resource processing speed over time. + */ + public val speed: StateFlow<Double> + get() = _speed + private val _speed = MutableStateFlow(0.0) + + override suspend fun consume(consumer: SimResourceConsumer<R>) { + check(!isClosed) { "Lifetime of resource has ended." } + check(cont == null) { "Run should not be called concurrently" } + + try { + return suspendCancellableCoroutine { cont -> + this.cont = cont + val ctx = Context(consumer, cont) + ctx.start() + cont.invokeOnCancellation { + ctx.stop() + } + } + } finally { + cont = null + } + } + + override fun close() { + isClosed = true + cont?.cancel() + cont = null + } + + /** + * A flag to indicate that the resource was closed. + */ + private var isClosed: Boolean = false + + /** + * The current active consumer. + */ + private var cont: CancellableContinuation<Unit>? = null + + /** + * Internal implementation of [SimResourceContext] for this class. + */ + private inner class Context( + consumer: SimResourceConsumer<R>, + val cont: Continuation<Unit> + ) : SimAbstractResourceContext<R>(resource, clock, consumer) { + /** + * The processing speed of the resource. + */ + private var speed: Double = 0.0 + set(value) { + field = value + _speed.value = field + } + + override fun onIdle(deadline: Long) { + speed = 0.0 + + // Do not resume if deadline is "infinite" + if (deadline != Long.MAX_VALUE) { + scheduler.startSingleTimerTo(this, deadline) { flush() } + } + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + speed = getSpeed(limit) + val until = min(deadline, clock.millis() + getDuration(work, speed)) + + scheduler.startSingleTimerTo(this, until) { flush() } + } + + override fun onFinish() { + speed = 0.0 + scheduler.cancel(this) + cont.resume(Unit) + } + + override fun onFailure(cause: Throwable) { + speed = 0.0 + scheduler.cancel(this) + cont.resumeWithException(cause) + } + + override fun toString(): String = "SimResourceSource.Context[resource=$resource]" + } +} |
