summaryrefslogtreecommitdiff
path: root/simulator/opendc-utils/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-17 16:51:38 +0100
committerGitHub <noreply@github.com>2021-03-17 16:51:38 +0100
commit054a3d376b8b31ba98f91e7b34c6e0ca717def18 (patch)
treeee739cf4092a2b807e0043bed7cae72cff7b6bac /simulator/opendc-utils/src
parentdf2f52780c08c5d108741d3746eaf03222c64841 (diff)
parentbb3b8e207a08edff81b8c2fe30b476c94bfea086 (diff)
Add uniform resource consumption model (v1)
This is the first in the series of pull requests to add a uniform resource consumption model to OpenDC. This pull request introduces the `opendc-simulator-resources` module which introduces the primitives with which we can model resource consumption of CPUs, disks and network: * `SimResourceProvider` represents a provider of some generic resource `R`, which may be consumed via `consume(SimResourceConsumer<R>)` * `SimResourceConsumer` represents a resource consumers and characterizes how the resource is being consumed. * `SimResourceSwitch` is a generic scheduler for sharing the capacity of multiple resources across multiple consumers. - `SimResourceSwitchExclusive`: A space-shared switch - each consumer is allocated a single resource exclusively. - `SimResourceSwitchMinMax`: A time-shared switch - each consumer gets a fair share of the resource capacity. * `SimResourceForwarder` converts a consumer in a provider. **Breaking Changes** * `ProcessingUnit` and `MemoryUnit` renamed to `SimProcessingUnit` and `SimMemoryUnit` respectively. * `TimerScheduler` accepts a `CoroutineContext` as opposed to a `CoroutineScope`.
Diffstat (limited to 'simulator/opendc-utils/src')
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt55
-rw-r--r--simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt14
2 files changed, 43 insertions, 26 deletions
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
index ff116443..49964938 100644
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
+++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
@@ -22,24 +22,28 @@
package org.opendc.utils
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.sendBlocking
-import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select
import java.time.Clock
import java.util.*
+import kotlin.coroutines.CoroutineContext
import kotlin.math.max
/**
* A TimerScheduler facilitates scheduled execution of future tasks.
*
- * @property coroutineScope The [CoroutineScope] to run the tasks in.
+ * @property context The [CoroutineContext] to run the tasks with.
* @property clock The clock to keep track of the time.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, private val clock: Clock) : AutoCloseable {
+public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clock) : AutoCloseable {
+ /**
+ * The scope in which the scheduler runs.
+ */
+ private val scope = CoroutineScope(context + Job())
+
/**
* A priority queue containing the tasks to be scheduled in the future.
*/
@@ -51,15 +55,17 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva
private val timers = mutableMapOf<T, Timer>()
/**
- * The channel to communicate with the
+ * The channel to communicate with the scheduling job.
*/
private val channel = Channel<Long?>(Channel.CONFLATED)
/**
* The scheduling job.
*/
- private val job = coroutineScope.launch {
+ private val job = scope.launch {
+ val timers = timers
val queue = queue
+ val clock = clock
var next: Long? = channel.receive()
while (true) {
@@ -69,7 +75,7 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva
val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select
onTimeout(delay) {
- while (queue.isNotEmpty()) {
+ while (queue.isNotEmpty() && isActive) {
val timer = queue.peek()
val timestamp = clock.millis()
@@ -84,7 +90,11 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva
if (!timer.isCancelled) {
timers.remove(timer.key)
- timer()
+ try {
+ timer()
+ } catch (e: Throwable) {
+ Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e)
+ }
}
}
@@ -99,7 +109,7 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva
*/
override fun close() {
cancelAll()
- job.cancel()
+ scope.cancel()
}
/**
@@ -176,17 +186,24 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva
require(timestamp >= now) { "Timestamp must be in the future" }
check(job.isActive) { "Timer is stopped" }
- val timer = Timer(key, timestamp, block)
-
timers.compute(key) { _, old ->
- old?.isCancelled = true
- timer
- }
- queue.add(timer)
+ if (old?.timestamp == timestamp) {
+ // Fast-path: timer for the same timestamp already exists
+ old
+ } else {
+ // Slow-path: cancel old timer and replace it with new timer
+ val timer = Timer(key, timestamp, block)
- // Check if we need to push the interruption forward
- if (queue.peek() == timer) {
- channel.sendBlocking(timer.timestamp)
+ old?.isCancelled = true
+ queue.add(timer)
+
+ // Check if we need to push the interruption forward
+ if (queue.peek() == timer) {
+ channel.sendBlocking(timer.timestamp)
+ }
+
+ timer
+ }
}
}
diff --git a/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt b/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt
index 3a4acc90..1fcb5d38 100644
--- a/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt
+++ b/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt
@@ -38,7 +38,7 @@ internal class TimerSchedulerTest {
fun testBasicTimer() {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val scheduler = TimerScheduler<Int>(this, clock)
+ val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.startSingleTimer(0, 1000) {
scheduler.close()
@@ -51,7 +51,7 @@ internal class TimerSchedulerTest {
fun testCancelNonExisting() {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val scheduler = TimerScheduler<Int>(this, clock)
+ val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.cancel(1)
scheduler.close()
@@ -62,7 +62,7 @@ internal class TimerSchedulerTest {
fun testCancelExisting() {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val scheduler = TimerScheduler<Int>(this, clock)
+ val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.startSingleTimer(0, 1000) {
assertFalse(false)
@@ -81,7 +81,7 @@ internal class TimerSchedulerTest {
fun testCancelAll() {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val scheduler = TimerScheduler<Int>(this, clock)
+ val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.startSingleTimer(0, 1000) {
assertFalse(false)
@@ -99,7 +99,7 @@ internal class TimerSchedulerTest {
fun testOverride() {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val scheduler = TimerScheduler<Int>(this, clock)
+ val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.startSingleTimer(0, 1000) {
assertFalse(false)
@@ -117,7 +117,7 @@ internal class TimerSchedulerTest {
fun testStopped() {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val scheduler = TimerScheduler<Int>(this, clock)
+ val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.close()
@@ -133,7 +133,7 @@ internal class TimerSchedulerTest {
fun testNegativeDelay() {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val scheduler = TimerScheduler<Int>(this, clock)
+ val scheduler = TimerScheduler<Int>(coroutineContext, clock)
assertThrows<IllegalArgumentException> {
scheduler.startSingleTimer(1, -1) {