diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-17 16:51:38 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-03-17 16:51:38 +0100 |
| commit | 054a3d376b8b31ba98f91e7b34c6e0ca717def18 (patch) | |
| tree | ee739cf4092a2b807e0043bed7cae72cff7b6bac /simulator/opendc-utils/src/main | |
| parent | df2f52780c08c5d108741d3746eaf03222c64841 (diff) | |
| parent | bb3b8e207a08edff81b8c2fe30b476c94bfea086 (diff) | |
Add uniform resource consumption model (v1)
This is the first in the series of pull requests to add a uniform resource consumption model to OpenDC. This pull request introduces the `opendc-simulator-resources` module which introduces the primitives with which we can model resource consumption of CPUs, disks and network:
* `SimResourceProvider` represents a provider of some generic resource `R`, which may be consumed via `consume(SimResourceConsumer<R>)`
* `SimResourceConsumer` represents a resource consumers and characterizes how the resource is being consumed.
* `SimResourceSwitch` is a generic scheduler for sharing the capacity of multiple resources across multiple consumers.
- `SimResourceSwitchExclusive`: A space-shared switch - each consumer is allocated a single resource exclusively.
- `SimResourceSwitchMinMax`: A time-shared switch - each consumer gets a fair share of the resource capacity.
* `SimResourceForwarder` converts a consumer in a provider.
**Breaking Changes**
* `ProcessingUnit` and `MemoryUnit` renamed to `SimProcessingUnit` and `SimMemoryUnit` respectively.
* `TimerScheduler` accepts a `CoroutineContext` as opposed to a `CoroutineScope`.
Diffstat (limited to 'simulator/opendc-utils/src/main')
| -rw-r--r-- | simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt | 55 |
1 files changed, 36 insertions, 19 deletions
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt index ff116443..49964938 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt @@ -22,24 +22,28 @@ package org.opendc.utils -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.sendBlocking -import kotlinx.coroutines.launch import kotlinx.coroutines.selects.select import java.time.Clock import java.util.* +import kotlin.coroutines.CoroutineContext import kotlin.math.max /** * A TimerScheduler facilitates scheduled execution of future tasks. * - * @property coroutineScope The [CoroutineScope] to run the tasks in. + * @property context The [CoroutineContext] to run the tasks with. * @property clock The clock to keep track of the time. */ @OptIn(ExperimentalCoroutinesApi::class) -public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, private val clock: Clock) : AutoCloseable { +public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clock) : AutoCloseable { + /** + * The scope in which the scheduler runs. + */ + private val scope = CoroutineScope(context + Job()) + /** * A priority queue containing the tasks to be scheduled in the future. */ @@ -51,15 +55,17 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva private val timers = mutableMapOf<T, Timer>() /** - * The channel to communicate with the + * The channel to communicate with the scheduling job. */ private val channel = Channel<Long?>(Channel.CONFLATED) /** * The scheduling job. */ - private val job = coroutineScope.launch { + private val job = scope.launch { + val timers = timers val queue = queue + val clock = clock var next: Long? = channel.receive() while (true) { @@ -69,7 +75,7 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select onTimeout(delay) { - while (queue.isNotEmpty()) { + while (queue.isNotEmpty() && isActive) { val timer = queue.peek() val timestamp = clock.millis() @@ -84,7 +90,11 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva if (!timer.isCancelled) { timers.remove(timer.key) - timer() + try { + timer() + } catch (e: Throwable) { + Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e) + } } } @@ -99,7 +109,7 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva */ override fun close() { cancelAll() - job.cancel() + scope.cancel() } /** @@ -176,17 +186,24 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva require(timestamp >= now) { "Timestamp must be in the future" } check(job.isActive) { "Timer is stopped" } - val timer = Timer(key, timestamp, block) - timers.compute(key) { _, old -> - old?.isCancelled = true - timer - } - queue.add(timer) + if (old?.timestamp == timestamp) { + // Fast-path: timer for the same timestamp already exists + old + } else { + // Slow-path: cancel old timer and replace it with new timer + val timer = Timer(key, timestamp, block) - // Check if we need to push the interruption forward - if (queue.peek() == timer) { - channel.sendBlocking(timer.timestamp) + old?.isCancelled = true + queue.add(timer) + + // Check if we need to push the interruption forward + if (queue.peek() == timer) { + channel.sendBlocking(timer.timestamp) + } + + timer + } } } |
