summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-common/build.gradle.kts (renamed from opendc-utils/build.gradle.kts)4
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt92
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt (renamed from opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt)243
-rw-r--r--opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt127
-rw-r--r--opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt (renamed from opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt)50
-rw-r--r--opendc-compute/opendc-compute-api/build.gradle.kts4
-rw-r--r--opendc-compute/opendc-compute-service/build.gradle.kts3
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt26
-rw-r--r--opendc-compute/opendc-compute-simulator/build.gradle.kts3
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/build.gradle.kts1
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-serverless20/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-tf20/build.gradle.kts5
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt4
-rw-r--r--opendc-faas/opendc-faas-api/build.gradle.kts4
-rw-r--r--opendc-faas/opendc-faas-service/build.gradle.kts3
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt2
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt21
-rw-r--r--opendc-faas/opendc-faas-simulator/build.gradle.kts1
-rw-r--r--opendc-harness/opendc-harness-api/build.gradle.kts1
-rw-r--r--opendc-harness/opendc-harness-cli/build.gradle.kts1
-rw-r--r--opendc-harness/opendc-harness-engine/build.gradle.kts1
-rw-r--r--opendc-harness/opendc-harness-junit5/build.gradle.kts1
-rw-r--r--opendc-platform/build.gradle.kts39
-rw-r--r--opendc-simulator/opendc-simulator-compute/build.gradle.kts1
-rw-r--r--opendc-simulator/opendc-simulator-core/build.gradle.kts1
-rw-r--r--opendc-simulator/opendc-simulator-flow/build.gradle.kts1
-rw-r--r--opendc-simulator/opendc-simulator-network/build.gradle.kts1
-rw-r--r--opendc-simulator/opendc-simulator-power/build.gradle.kts1
-rw-r--r--opendc-telemetry/opendc-telemetry-api/build.gradle.kts1
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/build.gradle.kts1
-rw-r--r--opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts1
-rw-r--r--opendc-trace/opendc-trace-api/build.gradle.kts4
-rw-r--r--opendc-trace/opendc-trace-azure/build.gradle.kts1
-rw-r--r--opendc-trace/opendc-trace-bitbrains/build.gradle.kts1
-rw-r--r--opendc-trace/opendc-trace-gwf/build.gradle.kts1
-rw-r--r--opendc-trace/opendc-trace-opendc/build.gradle.kts1
-rw-r--r--opendc-trace/opendc-trace-parquet/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-swf/build.gradle.kts1
-rw-r--r--opendc-trace/opendc-trace-tools/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-wfformat/build.gradle.kts1
-rw-r--r--opendc-trace/opendc-trace-wtf/build.gradle.kts1
-rw-r--r--opendc-web/opendc-web-runner/build.gradle.kts1
-rw-r--r--opendc-workflow/opendc-workflow-api/build.gradle.kts1
-rw-r--r--opendc-workflow/opendc-workflow-service/build.gradle.kts3
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt38
-rw-r--r--opendc-workflow/opendc-workflow-workload/build.gradle.kts1
-rw-r--r--settings.gradle.kts3
50 files changed, 397 insertions, 319 deletions
diff --git a/opendc-utils/build.gradle.kts b/opendc-common/build.gradle.kts
index 800b374d..fda4ffd5 100644
--- a/opendc-utils/build.gradle.kts
+++ b/opendc-common/build.gradle.kts
@@ -20,16 +20,16 @@
* SOFTWARE.
*/
-description = "Utilities used across OpenDC modules"
+description = "Common functionality used across OpenDC modules"
/* Build configuration */
plugins {
`kotlin-library-conventions`
`testing-conventions`
+ `jacoco-conventions`
}
dependencies {
- api(platform(projects.opendcPlatform))
api(libs.kotlinx.coroutines)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt b/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt
new file mode 100644
index 00000000..8ccff6c3
--- /dev/null
+++ b/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common.util
+
+import kotlinx.coroutines.*
+import java.lang.Runnable
+import java.time.Clock
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Helper class to pace the incoming scheduling requests.
+ *
+ * @param context The [CoroutineContext] in which the pacer runs.
+ * @param clock The virtual simulation clock.
+ * @param quantum The scheduling quantum.
+ * @param process The process to invoke for the incoming requests.
+ */
+public class Pacer(
+ private val context: CoroutineContext,
+ private val clock: Clock,
+ private val quantum: Long,
+ private val process: (Long) -> Unit
+) {
+ /**
+ * The [Delay] instance that provides scheduled execution of [Runnable]s.
+ */
+ @OptIn(InternalCoroutinesApi::class)
+ private val delay =
+ requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
+
+ /**
+ * The current [DisposableHandle] representing the pending scheduling cycle.
+ */
+ private var handle: DisposableHandle? = null
+
+ /**
+ * Determine whether a scheduling cycle is pending.
+ */
+ public val isPending: Boolean get() = handle != null
+
+ /**
+ * Enqueue a new scheduling cycle.
+ */
+ public fun enqueue() {
+ if (handle != null) {
+ return
+ }
+
+ val quantum = quantum
+ val now = clock.millis()
+
+ // We assume that the scheduler runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
+ // We calculate here the delay until the next scheduling slot.
+ val timeUntilNextSlot = quantum - (now % quantum)
+
+ @OptIn(InternalCoroutinesApi::class)
+ handle = delay.invokeOnTimeout(timeUntilNextSlot, {
+ process(now + timeUntilNextSlot)
+ handle = null
+ }, context)
+ }
+
+ /**
+ * Cancel the currently pending scheduling cycle.
+ */
+ public fun cancel() {
+ val handle = handle ?: return
+ this.handle = null
+ handle.dispose()
+ }
+}
diff --git a/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt
index d7da7f99..bec2c9f1 100644
--- a/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,15 +20,13 @@
* SOFTWARE.
*/
-package org.opendc.utils
+package org.opendc.common.util
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.selects.select
import java.time.Clock
import java.util.*
+import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
-import kotlin.math.max
/**
* A TimerScheduler facilitates scheduled execution of future tasks.
@@ -37,86 +35,83 @@ import kotlin.math.max
* @param clock The clock to keep track of the time.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clock) : AutoCloseable {
+public class TimerScheduler<T>(private val context: CoroutineContext, private val clock: Clock) {
/**
- * The scope in which the scheduler runs.
+ * The [Delay] instance that provides scheduled execution of [Runnable]s.
*/
- private val scope = CoroutineScope(context + Job())
+ @OptIn(InternalCoroutinesApi::class)
+ private val delay =
+ requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
+
+ /**
+ * The stack of the invocations to occur in the future.
+ */
+ private val invocations = ArrayDeque<Invocation>()
/**
* A priority queue containing the tasks to be scheduled in the future.
*/
- private val queue = PriorityQueue<Timer>()
+ private val queue = PriorityQueue<Timer<T>>()
/**
* A map that keeps track of the timers.
*/
- private val timers = mutableMapOf<T, Timer>()
+ private val timers = mutableMapOf<T, Timer<T>>()
/**
- * The channel to communicate with the scheduling job.
+ * Start a timer that will invoke the specified [block] after [delay].
+ *
+ * Each timer has a key and if a new timer with same key is started the previous is cancelled.
+ *
+ * @param key The key of the timer to start.
+ * @param delay The delay before invoking the block.
+ * @param block The block to invoke.
*/
- private val channel = Channel<Long?>(Channel.CONFLATED)
+ public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) {
+ startSingleTimerTo(key, clock.millis() + delay, block)
+ }
/**
- * A flag to indicate that the scheduler is active.
+ * Start a timer that will invoke the specified [block] at [timestamp].
+ *
+ * Each timer has a key and if a new timer with same key is started the previous is cancelled.
+ *
+ * @param key The key of the timer to start.
+ * @param timestamp The timestamp at which to invoke the block.
+ * @param block The block to invoke.
*/
- private var isActive = true
-
- init {
- scope.launch {
- val timers = timers
- val queue = queue
- val clock = clock
- val job = requireNotNull(coroutineContext[Job])
- val exceptionHandler = coroutineContext[CoroutineExceptionHandler]
- var next: Long? = channel.receive()
-
- while (true) {
- next = select {
- channel.onReceive { it }
-
- val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select
-
- onTimeout(delay) {
- while (queue.isNotEmpty() && job.isActive) {
- val timer = queue.peek()
- val timestamp = clock.millis()
-
- assert(timer.timestamp >= timestamp) { "Found task in the past" }
-
- if (timer.timestamp > timestamp && !timer.isCancelled) {
- // Schedule a task for the next event to occur.
- return@onTimeout timer.timestamp
- }
-
- queue.poll()
-
- if (!timer.isCancelled) {
- timers.remove(timer.key)
- try {
- timer()
- } catch (e: Throwable) {
- exceptionHandler?.handleException(coroutineContext, e)
- }
- }
- }
-
- null
- }
- }
+ public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) {
+ val now = clock.millis()
+ val queue = queue
+ val invocations = invocations
+
+ require(timestamp >= now) { "Timestamp must be in the future" }
+
+ timers.compute(key) { _, old ->
+ if (old != null && old.timestamp == timestamp) {
+ // Fast-path: timer for the same timestamp already exists
+ old.block = block
+ old
+ } else {
+ // Slow-path: cancel old timer and replace it with new timer
+ val timer = Timer(key, timestamp, block)
+
+ old?.isCancelled = true
+ queue.add(timer)
+ trySchedule(now, invocations, timestamp)
+
+ timer
}
}
}
/**
- * Stop the scheduler.
+ * Check if a timer with a given key is active.
+ *
+ * @param key The key to check if active.
+ * @return `true` if the timer with the specified [key] is active, `false` otherwise.
*/
- override fun close() {
- isActive = false
- cancelAll()
- scope.cancel()
- }
+ public fun isTimerActive(key: T): Boolean = key in timers
/**
* Cancel a timer with a given key.
@@ -127,28 +122,10 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
* @param key The key of the timer to cancel.
*/
public fun cancel(key: T) {
- if (!isActive) {
- return
- }
-
val timer = timers.remove(key)
// Mark the timer as cancelled
timer?.isCancelled = true
-
- // Optimization: check whether we are the head of the queue
- val queue = queue
- val channel = channel
- val peek = queue.peek()
- if (peek == timer) {
- queue.poll()
-
- if (queue.isNotEmpty()) {
- channel.trySend(peek.timestamp)
- } else {
- channel.trySend(null)
- }
- }
}
/**
@@ -157,64 +134,60 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
public fun cancelAll() {
queue.clear()
timers.clear()
- }
- /**
- * Check if a timer with a given key is active.
- *
- * @param key The key to check if active.
- * @return `true` if the timer with the specified [key] is active, `false` otherwise.
- */
- public fun isTimerActive(key: T): Boolean = key in timers
+ // Cancel all pending invocations
+ for (invocation in invocations) {
+ invocation.cancel()
+ }
+ invocations.clear()
+ }
/**
- * Start a timer that will invoke the specified [block] after [delay].
- *
- * Each timer has a key and if a new timer with same key is started the previous is cancelled.
+ * Try to schedule an engine invocation at the specified [target].
*
- * @param key The key of the timer to start.
- * @param delay The delay before invoking the block.
- * @param block The block to invoke.
+ * @param now The current virtual timestamp.
+ * @param target The virtual timestamp at which the engine invocation should happen.
+ * @param scheduled The queue of scheduled invocations.
*/
- public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) {
- startSingleTimerTo(key, clock.millis() + delay, block)
+ private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
+ val head = scheduled.peek()
+
+ // Only schedule a new scheduler invocation in case the target is earlier than all other pending
+ // scheduler invocations
+ if (head == null || target < head.timestamp) {
+ @OptIn(InternalCoroutinesApi::class)
+ val handle = delay.invokeOnTimeout(target - now, ::doRunTimers, context)
+ scheduled.addFirst(Invocation(target, handle))
+ }
}
/**
- * Start a timer that will invoke the specified [block] at [timestamp].
- *
- * Each timer has a key and if a new timer with same key is started the previous is cancelled.
- *
- * @param key The key of the timer to start.
- * @param timestamp The timestamp at which to invoke the block.
- * @param block The block to invoke.
+ * This method is invoked when the earliest timer expires.
*/
- public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) {
- val now = clock.millis()
- val queue = queue
- val channel = channel
+ private fun doRunTimers() {
+ val invocations = invocations
+ val invocation = checkNotNull(invocations.poll()) // Clear invocation from future invocation queue
+ val now = invocation.timestamp
- require(timestamp >= now) { "Timestamp must be in the future" }
- check(isActive) { "Timer is stopped" }
+ while (queue.isNotEmpty()) {
+ val timer = queue.peek()
- timers.compute(key) { _, old ->
- if (old != null && old.timestamp == timestamp) {
- // Fast-path: timer for the same timestamp already exists
- old
- } else {
- // Slow-path: cancel old timer and replace it with new timer
- val timer = Timer(key, timestamp, block)
+ val timestamp = timer.timestamp
+ val isCancelled = timer.isCancelled
- old?.isCancelled = true
- queue.add(timer)
+ assert(timestamp >= now) { "Found task in the past" }
- // Check if we need to push the interruption forward
- // Note that we check by timer reference
- if (queue.peek() === timer) {
- channel.trySend(timer.timestamp)
- }
+ if (timestamp > now && !isCancelled) {
+ // Schedule a task for the next event to occur.
+ trySchedule(now, invocations, timestamp)
+ break
+ }
- timer
+ queue.poll()
+
+ if (!isCancelled) {
+ timers.remove(timer.key)
+ timer()
}
}
}
@@ -222,7 +195,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
/**
* A task that is scheduled to run in the future.
*/
- private inner class Timer(val key: T, val timestamp: Long, val block: () -> Unit) : Comparable<Timer> {
+ private class Timer<T>(val key: T, val timestamp: Long, var block: () -> Unit) : Comparable<Timer<T>> {
/**
* A flag to indicate that the task has been cancelled.
*/
@@ -234,6 +207,22 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
*/
operator fun invoke(): Unit = block()
- override fun compareTo(other: Timer): Int = timestamp.compareTo(other.timestamp)
+ override fun compareTo(other: Timer<T>): Int = timestamp.compareTo(other.timestamp)
+ }
+
+ /**
+ * A future engine invocation.
+ *
+ * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case
+ * the invocation is not needed anymore, it can be cancelled via [cancel].
+ */
+ private class Invocation(
+ @JvmField val timestamp: Long,
+ @JvmField val handle: DisposableHandle
+ ) {
+ /**
+ * Cancel the engine invocation.
+ */
+ fun cancel() = handle.dispose()
}
}
diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt
new file mode 100644
index 00000000..1cd435f6
--- /dev/null
+++ b/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common.util
+
+import kotlinx.coroutines.delay
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import java.time.Clock
+import kotlin.coroutines.EmptyCoroutineContext
+
+/**
+ * Test suite for the [Pacer] class.
+ */
+class PacerTest {
+ @Test
+ fun testEmptyContext() {
+ assertThrows<IllegalArgumentException> { Pacer(EmptyCoroutineContext, Clock.systemUTC(), 100) {} }
+ }
+
+ @Test
+ fun testSingleEnqueue() {
+ var count = 0
+
+ runBlockingSimulation {
+ val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ count++
+ }
+
+ pacer.enqueue()
+ }
+
+ assertEquals(1, count) { "Process should execute once" }
+ }
+
+ @Test
+ fun testCascade() {
+ var count = 0
+
+ runBlockingSimulation {
+ val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ count++
+ }
+
+ pacer.enqueue()
+ pacer.enqueue()
+
+ assertTrue(pacer.isPending)
+ }
+
+ assertEquals(1, count) { "Process should execute once" }
+ }
+
+ @Test
+ fun testCancel() {
+ var count = 0
+
+ runBlockingSimulation {
+ val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ count++
+ }
+
+ pacer.enqueue()
+ pacer.cancel()
+
+ assertFalse(pacer.isPending)
+ }
+
+ assertEquals(0, count) { "Process should never execute " }
+ }
+
+ @Test
+ fun testCancelWithoutPending() {
+ var count = 0
+
+ runBlockingSimulation {
+ val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ count++
+ }
+
+ assertFalse(pacer.isPending)
+ assertDoesNotThrow { pacer.cancel() }
+
+ pacer.enqueue()
+ }
+
+ assertEquals(1, count) { "Process should execute once" }
+ }
+
+ @Test
+ fun testSubsequent() {
+ var count = 0
+
+ runBlockingSimulation {
+ val pacer = Pacer(coroutineContext, clock, quantum = 100) {
+ count++
+ }
+
+ pacer.enqueue()
+ delay(100)
+ pacer.enqueue()
+ }
+
+ assertEquals(2, count) { "Process should execute twice" }
+ }
+}
diff --git a/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt
index 101a6546..01f61f92 100644
--- a/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt
+++ b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,13 +20,15 @@
* SOFTWARE.
*/
-package org.opendc.utils
+package org.opendc.common.util
import kotlinx.coroutines.ExperimentalCoroutinesApi
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
+import java.time.Clock
+import kotlin.coroutines.EmptyCoroutineContext
/**
* A test suite for the [TimerScheduler] class.
@@ -34,14 +36,21 @@ import org.opendc.simulator.core.runBlockingSimulation
@OptIn(ExperimentalCoroutinesApi::class)
internal class TimerSchedulerTest {
@Test
+ fun testEmptyContext() {
+ assertThrows<IllegalArgumentException> { TimerScheduler<Unit>(EmptyCoroutineContext, Clock.systemUTC()) }
+ }
+
+ @Test
fun testBasicTimer() {
runBlockingSimulation {
val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.startSingleTimer(0, 1000) {
- scheduler.close()
assertEquals(1000, clock.millis())
}
+
+ assertTrue(scheduler.isTimerActive(0))
+ assertFalse(scheduler.isTimerActive(1))
}
}
@@ -51,7 +60,6 @@ internal class TimerSchedulerTest {
val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.cancel(1)
- scheduler.close()
}
}
@@ -61,12 +69,11 @@ internal class TimerSchedulerTest {
val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.startSingleTimer(0, 1000) {
- assertFalse(false)
+ fail()
}
scheduler.startSingleTimer(1, 100) {
scheduler.cancel(0)
- scheduler.close()
assertEquals(100, clock.millis())
}
@@ -78,15 +85,9 @@ internal class TimerSchedulerTest {
runBlockingSimulation {
val scheduler = TimerScheduler<Int>(coroutineContext, clock)
- scheduler.startSingleTimer(0, 1000) {
- assertFalse(false)
- }
-
- scheduler.startSingleTimer(1, 100) {
- assertFalse(false)
- }
-
- scheduler.close()
+ scheduler.startSingleTimer(0, 1000) { fail() }
+ scheduler.startSingleTimer(1, 100) { fail() }
+ scheduler.cancelAll()
}
}
@@ -95,12 +96,9 @@ internal class TimerSchedulerTest {
runBlockingSimulation {
val scheduler = TimerScheduler<Int>(coroutineContext, clock)
- scheduler.startSingleTimer(0, 1000) {
- assertFalse(false)
- }
+ scheduler.startSingleTimer(0, 1000) { fail() }
scheduler.startSingleTimer(0, 200) {
- scheduler.close()
assertEquals(200, clock.millis())
}
@@ -108,16 +106,14 @@ internal class TimerSchedulerTest {
}
@Test
- fun testStopped() {
+ fun testOverrideBlock() {
runBlockingSimulation {
val scheduler = TimerScheduler<Int>(coroutineContext, clock)
- scheduler.close()
+ scheduler.startSingleTimer(0, 1000) { fail() }
- assertThrows<IllegalStateException> {
- scheduler.startSingleTimer(1, 100) {
- assertFalse(false)
- }
+ scheduler.startSingleTimer(0, 1000) {
+ assertEquals(1000, clock.millis())
}
}
}
@@ -129,11 +125,9 @@ internal class TimerSchedulerTest {
assertThrows<IllegalArgumentException> {
scheduler.startSingleTimer(1, -1) {
- assertFalse(false)
+ fail()
}
}
-
- scheduler.close()
}
}
}
diff --git a/opendc-compute/opendc-compute-api/build.gradle.kts b/opendc-compute/opendc-compute-api/build.gradle.kts
index 880ee03d..2ac7e64c 100644
--- a/opendc-compute/opendc-compute-api/build.gradle.kts
+++ b/opendc-compute/opendc-compute-api/build.gradle.kts
@@ -26,7 +26,3 @@ description = "API interface for the OpenDC Compute service"
plugins {
`kotlin-library-conventions`
}
-
-dependencies {
- api(platform(projects.opendcPlatform))
-}
diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts
index 33cafc45..b9437a73 100644
--- a/opendc-compute/opendc-compute-service/build.gradle.kts
+++ b/opendc-compute/opendc-compute-service/build.gradle.kts
@@ -30,10 +30,9 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcCompute.opendcComputeApi)
api(projects.opendcTelemetry.opendcTelemetryApi)
- implementation(projects.opendcUtils)
+ implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
implementation(libs.opentelemetry.semconv)
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 27a6ecae..144b6573 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -29,13 +29,13 @@ import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.api.metrics.ObservableLongMeasurement
import kotlinx.coroutines.*
import mu.KotlinLogging
+import org.opendc.common.util.Pacer
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.utils.TimerScheduler
import java.time.Clock
import java.time.Duration
import java.util.*
@@ -56,7 +56,7 @@ internal class ComputeServiceImpl(
private val clock: Clock,
meterProvider: MeterProvider,
private val scheduler: ComputeScheduler,
- private val schedulingQuantum: Duration
+ schedulingQuantum: Duration
) : ComputeService, HostListener {
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
@@ -147,9 +147,9 @@ internal class ComputeServiceImpl(
private val _serversActiveAttr = Attributes.of(AttributeKey.stringKey("state"), "active")
/**
- * The [TimerScheduler] to use for scheduling the scheduler cycles.
+ * The [Pacer] to use for scheduling the scheduler cycles.
*/
- private var timerScheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock)
+ private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis(), ::doSchedule)
override val hosts: Set<Host>
get() = hostToView.keys
@@ -354,28 +354,18 @@ internal class ComputeServiceImpl(
* Indicate that a new scheduling cycle is needed due to a change to the service's state.
*/
private fun requestSchedulingCycle() {
- // Bail out in case we have already requested a new cycle or the queue is empty.
- if (timerScheduler.isTimerActive(Unit) || queue.isEmpty()) {
+ // Bail out in case the queue is empty.
+ if (queue.isEmpty()) {
return
}
- val quantum = schedulingQuantum.toMillis()
-
- // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
- // This is important because the slices of the VMs need to be aligned.
- // We calculate here the delay until the next scheduling slot.
- val delay = quantum - (clock.millis() % quantum)
-
- timerScheduler.startSingleTimer(Unit, delay) {
- doSchedule()
- }
+ pacer.enqueue()
}
/**
* Run a single scheduling iteration.
*/
- private fun doSchedule() {
- val now = clock.millis()
+ private fun doSchedule(now: Long) {
while (queue.isNotEmpty()) {
val request = queue.peek()
diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts
index aaf69f78..9a8cbfcc 100644
--- a/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -30,11 +30,10 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcCompute.opendcComputeService)
api(projects.opendcSimulator.opendcSimulatorCompute)
api(libs.commons.math3)
- implementation(projects.opendcUtils)
+ implementation(projects.opendcCommon)
implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 95921e8b..43f33f27 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -249,6 +249,12 @@ public class SimHost(
machine.cancel()
}
+ override fun hashCode(): Int = uid.hashCode()
+
+ override fun equals(other: Any?): Boolean {
+ return other is SimHost && uid == other.uid
+ }
+
override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
public suspend fun fail() {
diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts
index 28a5e1da..93e09b99 100644
--- a/opendc-compute/opendc-compute-workload/build.gradle.kts
+++ b/opendc-compute/opendc-compute-workload/build.gradle.kts
@@ -29,7 +29,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcTrace.opendcTraceApi)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
index a1a65da3..4b0b343f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
@@ -169,7 +169,7 @@ public class ComputeServiceHelper(
optimize = optimize
)
- _hosts.add(host)
+ require(_hosts.add(host)) { "Host with uid ${spec.uid} already exists" }
service.addHost(host)
return host
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index c20556b5..2def3dc5 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -30,7 +30,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcHarness.opendcHarnessApi)
api(projects.opendcCompute.opendcComputeWorkload)
diff --git a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts
index 65c31c4f..b96647a6 100644
--- a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts
@@ -29,7 +29,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcHarness.opendcHarnessApi)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcFaas.opendcFaasService)
diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
index 882c4894..43093abf 100644
--- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
@@ -29,16 +29,15 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcHarness.opendcHarnessApi)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
- implementation(projects.opendcUtils)
+ implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
implementation(libs.jackson.module.kotlin) {
exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect")
}
- implementation("org.jetbrains.kotlin:kotlin-reflect:1.5.30")
+ implementation("org.jetbrains.kotlin:kotlin-reflect:1.6.10")
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
index 9771cc20..7d65a674 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
@@ -23,7 +23,7 @@
package org.opendc.experiments.tf20.network
import kotlinx.coroutines.channels.Channel
-import org.opendc.utils.TimerScheduler
+import org.opendc.common.util.TimerScheduler
import java.time.Clock
import kotlin.coroutines.CoroutineContext
@@ -89,6 +89,6 @@ public class NetworkController(context: CoroutineContext, clock: Clock) : AutoCl
* Stop the network controller.
*/
override fun close() {
- scheduler.close()
+ scheduler.cancelAll()
}
}
diff --git a/opendc-faas/opendc-faas-api/build.gradle.kts b/opendc-faas/opendc-faas-api/build.gradle.kts
index 7362d949..8a295acd 100644
--- a/opendc-faas/opendc-faas-api/build.gradle.kts
+++ b/opendc-faas/opendc-faas-api/build.gradle.kts
@@ -26,7 +26,3 @@ description = "API for the OpenDC FaaS platform"
plugins {
`kotlin-library-conventions`
}
-
-dependencies {
- api(platform(projects.opendcPlatform))
-}
diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts
index 6f4fcc9b..7a561014 100644
--- a/opendc-faas/opendc-faas-service/build.gradle.kts
+++ b/opendc-faas/opendc-faas-service/build.gradle.kts
@@ -30,10 +30,9 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcFaas.opendcFaasApi)
api(projects.opendcTelemetry.opendcTelemetryApi)
- implementation(projects.opendcUtils)
+ implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
implementation(libs.opentelemetry.semconv)
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
index 63dbadc7..d579ad0c 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
@@ -22,9 +22,9 @@
package org.opendc.faas.service.autoscaler
+import org.opendc.common.util.TimerScheduler
import org.opendc.faas.service.deployer.FunctionInstance
import org.opendc.faas.service.deployer.FunctionInstanceState
-import org.opendc.utils.TimerScheduler
import java.time.Clock
import java.time.Duration
import kotlin.coroutines.CoroutineContext
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
index c285585a..1526be9d 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
@@ -27,6 +27,7 @@ import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import mu.KotlinLogging
+import org.opendc.common.util.Pacer
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.api.FaaSFunction
import org.opendc.faas.service.FaaSService
@@ -37,7 +38,6 @@ import org.opendc.faas.service.deployer.FunctionInstance
import org.opendc.faas.service.deployer.FunctionInstanceListener
import org.opendc.faas.service.deployer.FunctionInstanceState
import org.opendc.faas.service.router.RoutingPolicy
-import org.opendc.utils.TimerScheduler
import java.lang.IllegalStateException
import java.time.Clock
import java.util.*
@@ -55,7 +55,7 @@ import kotlin.coroutines.resumeWithException
internal class FaaSServiceImpl(
context: CoroutineContext,
private val clock: Clock,
- private val meterProvider: MeterProvider,
+ meterProvider: MeterProvider,
private val deployer: FunctionDeployer,
private val routingPolicy: RoutingPolicy,
private val terminationPolicy: FunctionTerminationPolicy
@@ -76,9 +76,9 @@ internal class FaaSServiceImpl(
private val meter = meterProvider.get("org.opendc.faas.service")
/**
- * The [TimerScheduler] to use for scheduling the scheduler cycles.
+ * The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val scheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock)
+ private val pacer = Pacer(scope.coroutineContext, clock, quantum = 100) { doSchedule() }
/**
* The [Random] instance used to generate unique identifiers for the objects.
@@ -191,19 +191,12 @@ internal class FaaSServiceImpl(
* Indicate that a new scheduling cycle is needed due to a change to the service's state.
*/
private fun schedule() {
- // Bail out in case we have already requested a new cycle or the queue is empty.
- if (scheduler.isTimerActive(Unit) || queue.isEmpty()) {
+ // Bail out in case the queue is empty.
+ if (queue.isEmpty()) {
return
}
- val quantum = 100
-
- // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
- // This is important because the slices of the VMs need to be aligned.
- // We calculate here the delay until the next scheduling slot.
- val delay = quantum - (clock.millis() % quantum)
-
- scheduler.startSingleTimer(Unit, delay, ::doSchedule)
+ pacer.enqueue()
}
/**
diff --git a/opendc-faas/opendc-faas-simulator/build.gradle.kts b/opendc-faas/opendc-faas-simulator/build.gradle.kts
index fed1862d..50f75429 100644
--- a/opendc-faas/opendc-faas-simulator/build.gradle.kts
+++ b/opendc-faas/opendc-faas-simulator/build.gradle.kts
@@ -30,7 +30,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcFaas.opendcFaasService)
api(projects.opendcSimulator.opendcSimulatorCompute)
diff --git a/opendc-harness/opendc-harness-api/build.gradle.kts b/opendc-harness/opendc-harness-api/build.gradle.kts
index 5c464377..c37f14c2 100644
--- a/opendc-harness/opendc-harness-api/build.gradle.kts
+++ b/opendc-harness/opendc-harness-api/build.gradle.kts
@@ -28,7 +28,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(libs.junit.platform.commons)
implementation(libs.kotlin.logging)
diff --git a/opendc-harness/opendc-harness-cli/build.gradle.kts b/opendc-harness/opendc-harness-cli/build.gradle.kts
index 5d0c0460..4d0f469f 100644
--- a/opendc-harness/opendc-harness-cli/build.gradle.kts
+++ b/opendc-harness/opendc-harness-cli/build.gradle.kts
@@ -34,7 +34,6 @@ application {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcHarness.opendcHarnessEngine)
implementation(libs.kotlin.logging)
diff --git a/opendc-harness/opendc-harness-engine/build.gradle.kts b/opendc-harness/opendc-harness-engine/build.gradle.kts
index cbd10f6a..6bf08b7b 100644
--- a/opendc-harness/opendc-harness-engine/build.gradle.kts
+++ b/opendc-harness/opendc-harness-engine/build.gradle.kts
@@ -30,7 +30,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcHarness.opendcHarnessApi)
api(libs.kotlinx.coroutines)
diff --git a/opendc-harness/opendc-harness-junit5/build.gradle.kts b/opendc-harness/opendc-harness-junit5/build.gradle.kts
index e63367d3..f78a4c30 100644
--- a/opendc-harness/opendc-harness-junit5/build.gradle.kts
+++ b/opendc-harness/opendc-harness-junit5/build.gradle.kts
@@ -28,7 +28,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcHarness.opendcHarnessEngine)
implementation(libs.kotlin.logging)
diff --git a/opendc-platform/build.gradle.kts b/opendc-platform/build.gradle.kts
deleted file mode 100644
index c2cb84a8..00000000
--- a/opendc-platform/build.gradle.kts
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-plugins {
- `java-platform`
- `publishing-conventions`
-}
-
-description = "Java platform for the OpenDC project"
-
-publishing.publications.named<MavenPublication>("maven") {
- from(components["javaPlatform"])
-
- pom {
- description.set(
- "This Bill of Materials POM can be used to ease dependency management " +
- "when referencing multiple OpenDC artifacts using Gradle or Maven."
- )
- }
-}
diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts
index ca8b912a..fb516ccf 100644
--- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts
@@ -30,7 +30,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcSimulator.opendcSimulatorFlow)
api(projects.opendcSimulator.opendcSimulatorPower)
api(projects.opendcSimulator.opendcSimulatorNetwork)
diff --git a/opendc-simulator/opendc-simulator-core/build.gradle.kts b/opendc-simulator/opendc-simulator-core/build.gradle.kts
index 6dbf4199..0de96a8e 100644
--- a/opendc-simulator/opendc-simulator-core/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-core/build.gradle.kts
@@ -28,6 +28,5 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(libs.kotlinx.coroutines)
}
diff --git a/opendc-simulator/opendc-simulator-flow/build.gradle.kts b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
index f5b67851..3f08071a 100644
--- a/opendc-simulator/opendc-simulator-flow/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
@@ -30,7 +30,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(libs.kotlinx.coroutines)
implementation(libs.kotlin.logging)
diff --git a/opendc-simulator/opendc-simulator-network/build.gradle.kts b/opendc-simulator/opendc-simulator-network/build.gradle.kts
index f8931053..0cc7763e 100644
--- a/opendc-simulator/opendc-simulator-network/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-network/build.gradle.kts
@@ -29,7 +29,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcSimulator.opendcSimulatorFlow)
implementation(projects.opendcSimulator.opendcSimulatorCore)
diff --git a/opendc-simulator/opendc-simulator-power/build.gradle.kts b/opendc-simulator/opendc-simulator-power/build.gradle.kts
index 5d8c8949..59859403 100644
--- a/opendc-simulator/opendc-simulator-power/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-power/build.gradle.kts
@@ -29,7 +29,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcSimulator.opendcSimulatorFlow)
implementation(projects.opendcSimulator.opendcSimulatorCore)
diff --git a/opendc-telemetry/opendc-telemetry-api/build.gradle.kts b/opendc-telemetry/opendc-telemetry-api/build.gradle.kts
index 5492fc14..32a36d68 100644
--- a/opendc-telemetry/opendc-telemetry-api/build.gradle.kts
+++ b/opendc-telemetry/opendc-telemetry-api/build.gradle.kts
@@ -28,6 +28,5 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(libs.opentelemetry.api)
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
index cd8cb57a..47e30a14 100644
--- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
+++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
@@ -28,7 +28,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcTelemetry.opendcTelemetrySdk)
implementation(libs.opentelemetry.semconv)
diff --git a/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts b/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts
index 3b918775..4b3241bc 100644
--- a/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts
+++ b/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts
@@ -28,7 +28,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcTelemetry.opendcTelemetryApi)
api(libs.kotlinx.coroutines)
api(libs.opentelemetry.sdk.main)
diff --git a/opendc-trace/opendc-trace-api/build.gradle.kts b/opendc-trace/opendc-trace-api/build.gradle.kts
index b2f91593..977eec0d 100644
--- a/opendc-trace/opendc-trace-api/build.gradle.kts
+++ b/opendc-trace/opendc-trace-api/build.gradle.kts
@@ -26,7 +26,3 @@ description = "Workload trace library for OpenDC"
plugins {
`kotlin-library-conventions`
}
-
-dependencies {
- api(platform(projects.opendcPlatform))
-}
diff --git a/opendc-trace/opendc-trace-azure/build.gradle.kts b/opendc-trace/opendc-trace-azure/build.gradle.kts
index 8bde56cb..89626737 100644
--- a/opendc-trace/opendc-trace-azure/build.gradle.kts
+++ b/opendc-trace/opendc-trace-azure/build.gradle.kts
@@ -30,7 +30,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcTrace.opendcTraceApi)
implementation(libs.jackson.dataformat.csv)
}
diff --git a/opendc-trace/opendc-trace-bitbrains/build.gradle.kts b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts
index d195cbbb..71c0e794 100644
--- a/opendc-trace/opendc-trace-bitbrains/build.gradle.kts
+++ b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts
@@ -30,7 +30,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcTrace.opendcTraceApi)
implementation(libs.jackson.dataformat.csv)
}
diff --git a/opendc-trace/opendc-trace-gwf/build.gradle.kts b/opendc-trace/opendc-trace-gwf/build.gradle.kts
index f3dfd6ef..d02f7a19 100644
--- a/opendc-trace/opendc-trace-gwf/build.gradle.kts
+++ b/opendc-trace/opendc-trace-gwf/build.gradle.kts
@@ -30,7 +30,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcTrace.opendcTraceApi)
implementation(libs.jackson.dataformat.csv)
diff --git a/opendc-trace/opendc-trace-opendc/build.gradle.kts b/opendc-trace/opendc-trace-opendc/build.gradle.kts
index b9c242a1..d1b4735e 100644
--- a/opendc-trace/opendc-trace-opendc/build.gradle.kts
+++ b/opendc-trace/opendc-trace-opendc/build.gradle.kts
@@ -30,7 +30,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcTrace.opendcTraceApi)
implementation(projects.opendcTrace.opendcTraceParquet)
diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts
index 75378509..f943b6a6 100644
--- a/opendc-trace/opendc-trace-parquet/build.gradle.kts
+++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts
@@ -30,8 +30,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
-
/* This configuration is necessary for a slim dependency on Apache Parquet */
api(libs.parquet) {
exclude(group = "org.apache.hadoop")
diff --git a/opendc-trace/opendc-trace-swf/build.gradle.kts b/opendc-trace/opendc-trace-swf/build.gradle.kts
index c9eaa78d..25d59b49 100644
--- a/opendc-trace/opendc-trace-swf/build.gradle.kts
+++ b/opendc-trace/opendc-trace-swf/build.gradle.kts
@@ -30,6 +30,5 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcTrace.opendcTraceApi)
}
diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts
index 14a0fc7c..0c1e179e 100644
--- a/opendc-trace/opendc-trace-tools/build.gradle.kts
+++ b/opendc-trace/opendc-trace-tools/build.gradle.kts
@@ -33,8 +33,6 @@ application {
}
dependencies {
- api(platform(projects.opendcPlatform))
-
implementation(projects.opendcTrace.opendcTraceApi)
implementation(libs.kotlin.logging)
implementation(libs.clikt)
diff --git a/opendc-trace/opendc-trace-wfformat/build.gradle.kts b/opendc-trace/opendc-trace-wfformat/build.gradle.kts
index 2d336d03..cd78fd61 100644
--- a/opendc-trace/opendc-trace-wfformat/build.gradle.kts
+++ b/opendc-trace/opendc-trace-wfformat/build.gradle.kts
@@ -30,7 +30,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcTrace.opendcTraceApi)
implementation(libs.jackson.core)
diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts
index e4f0ab3a..8301e363 100644
--- a/opendc-trace/opendc-trace-wtf/build.gradle.kts
+++ b/opendc-trace/opendc-trace-wtf/build.gradle.kts
@@ -30,7 +30,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcTrace.opendcTraceApi)
implementation(projects.opendcTrace.opendcTraceParquet)
diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts
index 810f512f..a73ca6b3 100644
--- a/opendc-web/opendc-web-runner/build.gradle.kts
+++ b/opendc-web/opendc-web-runner/build.gradle.kts
@@ -34,7 +34,6 @@ application {
}
dependencies {
- api(platform(projects.opendcPlatform))
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcCompute.opendcComputeWorkload)
implementation(projects.opendcSimulator.opendcSimulatorCore)
diff --git a/opendc-workflow/opendc-workflow-api/build.gradle.kts b/opendc-workflow/opendc-workflow-api/build.gradle.kts
index 36239d05..03569d8c 100644
--- a/opendc-workflow/opendc-workflow-api/build.gradle.kts
+++ b/opendc-workflow/opendc-workflow-api/build.gradle.kts
@@ -28,7 +28,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcCompute.opendcComputeApi)
implementation(libs.kotlin.logging)
}
diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts
index 4d8b7d7f..17df33e3 100644
--- a/opendc-workflow/opendc-workflow-service/build.gradle.kts
+++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts
@@ -30,11 +30,10 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcWorkflow.opendcWorkflowApi)
api(projects.opendcCompute.opendcComputeApi)
api(projects.opendcTelemetry.opendcTelemetryApi)
- implementation(projects.opendcUtils)
+ implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
testImplementation(projects.opendcWorkflow.opendcWorkflowWorkload)
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index 7b6d8651..cdaec021 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -25,8 +25,8 @@ package org.opendc.workflow.service.internal
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
+import org.opendc.common.util.Pacer
import org.opendc.compute.api.*
-import org.opendc.utils.TimerScheduler
import org.opendc.workflow.api.Job
import org.opendc.workflow.api.WORKFLOW_TASK_CORES
import org.opendc.workflow.service.*
@@ -187,9 +187,9 @@ public class WorkflowServiceImpl(
.build()
/**
- * The [TimerScheduler] to use for scheduling the scheduler cycles.
+ * The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val timerScheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock)
+ private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
@@ -230,7 +230,7 @@ public class WorkflowServiceImpl(
rootListener.jobSubmitted(jobInstance)
submittedJobs.add(1)
- requestSchedulingCycle()
+ pacer.enqueue()
}
override fun close() {
@@ -252,31 +252,6 @@ public class WorkflowServiceImpl(
}
/**
- * Indicate that a new scheduling cycle is needed due to a change to the service's state.
- */
- private fun requestSchedulingCycle() {
- // Bail out in case we have already requested a new cycle or the queue is empty.
- if (timerScheduler.isTimerActive(Unit)) {
- return
- }
-
- val quantum = schedulingQuantum.toMillis()
- if (quantum == 0L) {
- doSchedule()
- return
- }
-
- // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
- // This is important because the slices of the VMs need to be aligned.
- // We calculate here the delay until the next scheduling slot.
- val delay = quantum - (clock.millis() % quantum)
-
- timerScheduler.startSingleTimer(Unit, delay) {
- doSchedule()
- }
- }
-
- /**
* Perform a scheduling cycle immediately.
*/
private fun doSchedule() {
@@ -409,10 +384,9 @@ public class WorkflowServiceImpl(
finishJob(job)
}
- requestSchedulingCycle()
- }
- ServerState.DELETED -> {
+ pacer.enqueue()
}
+ ServerState.DELETED -> {}
else -> throw IllegalStateException()
}
}
diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts
index dfb77a39..a9d497af 100644
--- a/opendc-workflow/opendc-workflow-workload/build.gradle.kts
+++ b/opendc-workflow/opendc-workflow-workload/build.gradle.kts
@@ -29,7 +29,6 @@ plugins {
}
dependencies {
- api(platform(projects.opendcPlatform))
api(projects.opendcWorkflow.opendcWorkflowService)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 170a687d..f518a984 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -21,7 +21,7 @@
*/
rootProject.name = "opendc"
-include(":opendc-platform")
+include(":opendc-common")
include(":opendc-compute:opendc-compute-api")
include(":opendc-compute:opendc-compute-service")
include(":opendc-compute:opendc-compute-simulator")
@@ -60,7 +60,6 @@ include(":opendc-harness:opendc-harness-api")
include(":opendc-harness:opendc-harness-engine")
include(":opendc-harness:opendc-harness-cli")
include(":opendc-harness:opendc-harness-junit5")
-include(":opendc-utils")
enableFeaturePreview("VERSION_CATALOGS")
enableFeaturePreview("TYPESAFE_PROJECT_ACCESSORS")