diff options
50 files changed, 397 insertions, 319 deletions
diff --git a/opendc-utils/build.gradle.kts b/opendc-common/build.gradle.kts index 800b374d..fda4ffd5 100644 --- a/opendc-utils/build.gradle.kts +++ b/opendc-common/build.gradle.kts @@ -20,16 +20,16 @@ * SOFTWARE. */ -description = "Utilities used across OpenDC modules" +description = "Common functionality used across OpenDC modules" /* Build configuration */ plugins { `kotlin-library-conventions` `testing-conventions` + `jacoco-conventions` } dependencies { - api(platform(projects.opendcPlatform)) api(libs.kotlinx.coroutines) testImplementation(projects.opendcSimulator.opendcSimulatorCore) diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt b/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt new file mode 100644 index 00000000..8ccff6c3 --- /dev/null +++ b/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.common.util + +import kotlinx.coroutines.* +import java.lang.Runnable +import java.time.Clock +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext + +/** + * Helper class to pace the incoming scheduling requests. + * + * @param context The [CoroutineContext] in which the pacer runs. + * @param clock The virtual simulation clock. + * @param quantum The scheduling quantum. + * @param process The process to invoke for the incoming requests. + */ +public class Pacer( + private val context: CoroutineContext, + private val clock: Clock, + private val quantum: Long, + private val process: (Long) -> Unit +) { + /** + * The [Delay] instance that provides scheduled execution of [Runnable]s. + */ + @OptIn(InternalCoroutinesApi::class) + private val delay = + requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } + + /** + * The current [DisposableHandle] representing the pending scheduling cycle. + */ + private var handle: DisposableHandle? = null + + /** + * Determine whether a scheduling cycle is pending. + */ + public val isPending: Boolean get() = handle != null + + /** + * Enqueue a new scheduling cycle. + */ + public fun enqueue() { + if (handle != null) { + return + } + + val quantum = quantum + val now = clock.millis() + + // We assume that the scheduler runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). + // We calculate here the delay until the next scheduling slot. + val timeUntilNextSlot = quantum - (now % quantum) + + @OptIn(InternalCoroutinesApi::class) + handle = delay.invokeOnTimeout(timeUntilNextSlot, { + process(now + timeUntilNextSlot) + handle = null + }, context) + } + + /** + * Cancel the currently pending scheduling cycle. + */ + public fun cancel() { + val handle = handle ?: return + this.handle = null + handle.dispose() + } +} diff --git a/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt index d7da7f99..bec2c9f1 100644 --- a/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,15 +20,13 @@ * SOFTWARE. */ -package org.opendc.utils +package org.opendc.common.util import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.selects.select import java.time.Clock import java.util.* +import kotlin.coroutines.ContinuationInterceptor import kotlin.coroutines.CoroutineContext -import kotlin.math.max /** * A TimerScheduler facilitates scheduled execution of future tasks. @@ -37,86 +35,83 @@ import kotlin.math.max * @param clock The clock to keep track of the time. */ @OptIn(ExperimentalCoroutinesApi::class) -public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clock) : AutoCloseable { +public class TimerScheduler<T>(private val context: CoroutineContext, private val clock: Clock) { /** - * The scope in which the scheduler runs. + * The [Delay] instance that provides scheduled execution of [Runnable]s. */ - private val scope = CoroutineScope(context + Job()) + @OptIn(InternalCoroutinesApi::class) + private val delay = + requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } + + /** + * The stack of the invocations to occur in the future. + */ + private val invocations = ArrayDeque<Invocation>() /** * A priority queue containing the tasks to be scheduled in the future. */ - private val queue = PriorityQueue<Timer>() + private val queue = PriorityQueue<Timer<T>>() /** * A map that keeps track of the timers. */ - private val timers = mutableMapOf<T, Timer>() + private val timers = mutableMapOf<T, Timer<T>>() /** - * The channel to communicate with the scheduling job. + * Start a timer that will invoke the specified [block] after [delay]. + * + * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * + * @param key The key of the timer to start. + * @param delay The delay before invoking the block. + * @param block The block to invoke. */ - private val channel = Channel<Long?>(Channel.CONFLATED) + public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) { + startSingleTimerTo(key, clock.millis() + delay, block) + } /** - * A flag to indicate that the scheduler is active. + * Start a timer that will invoke the specified [block] at [timestamp]. + * + * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * + * @param key The key of the timer to start. + * @param timestamp The timestamp at which to invoke the block. + * @param block The block to invoke. */ - private var isActive = true - - init { - scope.launch { - val timers = timers - val queue = queue - val clock = clock - val job = requireNotNull(coroutineContext[Job]) - val exceptionHandler = coroutineContext[CoroutineExceptionHandler] - var next: Long? = channel.receive() - - while (true) { - next = select { - channel.onReceive { it } - - val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select - - onTimeout(delay) { - while (queue.isNotEmpty() && job.isActive) { - val timer = queue.peek() - val timestamp = clock.millis() - - assert(timer.timestamp >= timestamp) { "Found task in the past" } - - if (timer.timestamp > timestamp && !timer.isCancelled) { - // Schedule a task for the next event to occur. - return@onTimeout timer.timestamp - } - - queue.poll() - - if (!timer.isCancelled) { - timers.remove(timer.key) - try { - timer() - } catch (e: Throwable) { - exceptionHandler?.handleException(coroutineContext, e) - } - } - } - - null - } - } + public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) { + val now = clock.millis() + val queue = queue + val invocations = invocations + + require(timestamp >= now) { "Timestamp must be in the future" } + + timers.compute(key) { _, old -> + if (old != null && old.timestamp == timestamp) { + // Fast-path: timer for the same timestamp already exists + old.block = block + old + } else { + // Slow-path: cancel old timer and replace it with new timer + val timer = Timer(key, timestamp, block) + + old?.isCancelled = true + queue.add(timer) + trySchedule(now, invocations, timestamp) + + timer } } } /** - * Stop the scheduler. + * Check if a timer with a given key is active. + * + * @param key The key to check if active. + * @return `true` if the timer with the specified [key] is active, `false` otherwise. */ - override fun close() { - isActive = false - cancelAll() - scope.cancel() - } + public fun isTimerActive(key: T): Boolean = key in timers /** * Cancel a timer with a given key. @@ -127,28 +122,10 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo * @param key The key of the timer to cancel. */ public fun cancel(key: T) { - if (!isActive) { - return - } - val timer = timers.remove(key) // Mark the timer as cancelled timer?.isCancelled = true - - // Optimization: check whether we are the head of the queue - val queue = queue - val channel = channel - val peek = queue.peek() - if (peek == timer) { - queue.poll() - - if (queue.isNotEmpty()) { - channel.trySend(peek.timestamp) - } else { - channel.trySend(null) - } - } } /** @@ -157,64 +134,60 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo public fun cancelAll() { queue.clear() timers.clear() - } - /** - * Check if a timer with a given key is active. - * - * @param key The key to check if active. - * @return `true` if the timer with the specified [key] is active, `false` otherwise. - */ - public fun isTimerActive(key: T): Boolean = key in timers + // Cancel all pending invocations + for (invocation in invocations) { + invocation.cancel() + } + invocations.clear() + } /** - * Start a timer that will invoke the specified [block] after [delay]. - * - * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * Try to schedule an engine invocation at the specified [target]. * - * @param key The key of the timer to start. - * @param delay The delay before invoking the block. - * @param block The block to invoke. + * @param now The current virtual timestamp. + * @param target The virtual timestamp at which the engine invocation should happen. + * @param scheduled The queue of scheduled invocations. */ - public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) { - startSingleTimerTo(key, clock.millis() + delay, block) + private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) { + val head = scheduled.peek() + + // Only schedule a new scheduler invocation in case the target is earlier than all other pending + // scheduler invocations + if (head == null || target < head.timestamp) { + @OptIn(InternalCoroutinesApi::class) + val handle = delay.invokeOnTimeout(target - now, ::doRunTimers, context) + scheduled.addFirst(Invocation(target, handle)) + } } /** - * Start a timer that will invoke the specified [block] at [timestamp]. - * - * Each timer has a key and if a new timer with same key is started the previous is cancelled. - * - * @param key The key of the timer to start. - * @param timestamp The timestamp at which to invoke the block. - * @param block The block to invoke. + * This method is invoked when the earliest timer expires. */ - public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) { - val now = clock.millis() - val queue = queue - val channel = channel + private fun doRunTimers() { + val invocations = invocations + val invocation = checkNotNull(invocations.poll()) // Clear invocation from future invocation queue + val now = invocation.timestamp - require(timestamp >= now) { "Timestamp must be in the future" } - check(isActive) { "Timer is stopped" } + while (queue.isNotEmpty()) { + val timer = queue.peek() - timers.compute(key) { _, old -> - if (old != null && old.timestamp == timestamp) { - // Fast-path: timer for the same timestamp already exists - old - } else { - // Slow-path: cancel old timer and replace it with new timer - val timer = Timer(key, timestamp, block) + val timestamp = timer.timestamp + val isCancelled = timer.isCancelled - old?.isCancelled = true - queue.add(timer) + assert(timestamp >= now) { "Found task in the past" } - // Check if we need to push the interruption forward - // Note that we check by timer reference - if (queue.peek() === timer) { - channel.trySend(timer.timestamp) - } + if (timestamp > now && !isCancelled) { + // Schedule a task for the next event to occur. + trySchedule(now, invocations, timestamp) + break + } - timer + queue.poll() + + if (!isCancelled) { + timers.remove(timer.key) + timer() } } } @@ -222,7 +195,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo /** * A task that is scheduled to run in the future. */ - private inner class Timer(val key: T, val timestamp: Long, val block: () -> Unit) : Comparable<Timer> { + private class Timer<T>(val key: T, val timestamp: Long, var block: () -> Unit) : Comparable<Timer<T>> { /** * A flag to indicate that the task has been cancelled. */ @@ -234,6 +207,22 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo */ operator fun invoke(): Unit = block() - override fun compareTo(other: Timer): Int = timestamp.compareTo(other.timestamp) + override fun compareTo(other: Timer<T>): Int = timestamp.compareTo(other.timestamp) + } + + /** + * A future engine invocation. + * + * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case + * the invocation is not needed anymore, it can be cancelled via [cancel]. + */ + private class Invocation( + @JvmField val timestamp: Long, + @JvmField val handle: DisposableHandle + ) { + /** + * Cancel the engine invocation. + */ + fun cancel() = handle.dispose() } } diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt new file mode 100644 index 00000000..1cd435f6 --- /dev/null +++ b/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.common.util + +import kotlinx.coroutines.delay +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import java.time.Clock +import kotlin.coroutines.EmptyCoroutineContext + +/** + * Test suite for the [Pacer] class. + */ +class PacerTest { + @Test + fun testEmptyContext() { + assertThrows<IllegalArgumentException> { Pacer(EmptyCoroutineContext, Clock.systemUTC(), 100) {} } + } + + @Test + fun testSingleEnqueue() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + } + + assertEquals(1, count) { "Process should execute once" } + } + + @Test + fun testCascade() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + pacer.enqueue() + + assertTrue(pacer.isPending) + } + + assertEquals(1, count) { "Process should execute once" } + } + + @Test + fun testCancel() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + pacer.cancel() + + assertFalse(pacer.isPending) + } + + assertEquals(0, count) { "Process should never execute " } + } + + @Test + fun testCancelWithoutPending() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + assertFalse(pacer.isPending) + assertDoesNotThrow { pacer.cancel() } + + pacer.enqueue() + } + + assertEquals(1, count) { "Process should execute once" } + } + + @Test + fun testSubsequent() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + delay(100) + pacer.enqueue() + } + + assertEquals(2, count) { "Process should execute twice" } + } +} diff --git a/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt index 101a6546..01f61f92 100644 --- a/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt +++ b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,13 +20,15 @@ * SOFTWARE. */ -package org.opendc.utils +package org.opendc.common.util import kotlinx.coroutines.ExperimentalCoroutinesApi import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation +import java.time.Clock +import kotlin.coroutines.EmptyCoroutineContext /** * A test suite for the [TimerScheduler] class. @@ -34,14 +36,21 @@ import org.opendc.simulator.core.runBlockingSimulation @OptIn(ExperimentalCoroutinesApi::class) internal class TimerSchedulerTest { @Test + fun testEmptyContext() { + assertThrows<IllegalArgumentException> { TimerScheduler<Unit>(EmptyCoroutineContext, Clock.systemUTC()) } + } + + @Test fun testBasicTimer() { runBlockingSimulation { val scheduler = TimerScheduler<Int>(coroutineContext, clock) scheduler.startSingleTimer(0, 1000) { - scheduler.close() assertEquals(1000, clock.millis()) } + + assertTrue(scheduler.isTimerActive(0)) + assertFalse(scheduler.isTimerActive(1)) } } @@ -51,7 +60,6 @@ internal class TimerSchedulerTest { val scheduler = TimerScheduler<Int>(coroutineContext, clock) scheduler.cancel(1) - scheduler.close() } } @@ -61,12 +69,11 @@ internal class TimerSchedulerTest { val scheduler = TimerScheduler<Int>(coroutineContext, clock) scheduler.startSingleTimer(0, 1000) { - assertFalse(false) + fail() } scheduler.startSingleTimer(1, 100) { scheduler.cancel(0) - scheduler.close() assertEquals(100, clock.millis()) } @@ -78,15 +85,9 @@ internal class TimerSchedulerTest { runBlockingSimulation { val scheduler = TimerScheduler<Int>(coroutineContext, clock) - scheduler.startSingleTimer(0, 1000) { - assertFalse(false) - } - - scheduler.startSingleTimer(1, 100) { - assertFalse(false) - } - - scheduler.close() + scheduler.startSingleTimer(0, 1000) { fail() } + scheduler.startSingleTimer(1, 100) { fail() } + scheduler.cancelAll() } } @@ -95,12 +96,9 @@ internal class TimerSchedulerTest { runBlockingSimulation { val scheduler = TimerScheduler<Int>(coroutineContext, clock) - scheduler.startSingleTimer(0, 1000) { - assertFalse(false) - } + scheduler.startSingleTimer(0, 1000) { fail() } scheduler.startSingleTimer(0, 200) { - scheduler.close() assertEquals(200, clock.millis()) } @@ -108,16 +106,14 @@ internal class TimerSchedulerTest { } @Test - fun testStopped() { + fun testOverrideBlock() { runBlockingSimulation { val scheduler = TimerScheduler<Int>(coroutineContext, clock) - scheduler.close() + scheduler.startSingleTimer(0, 1000) { fail() } - assertThrows<IllegalStateException> { - scheduler.startSingleTimer(1, 100) { - assertFalse(false) - } + scheduler.startSingleTimer(0, 1000) { + assertEquals(1000, clock.millis()) } } } @@ -129,11 +125,9 @@ internal class TimerSchedulerTest { assertThrows<IllegalArgumentException> { scheduler.startSingleTimer(1, -1) { - assertFalse(false) + fail() } } - - scheduler.close() } } } diff --git a/opendc-compute/opendc-compute-api/build.gradle.kts b/opendc-compute/opendc-compute-api/build.gradle.kts index 880ee03d..2ac7e64c 100644 --- a/opendc-compute/opendc-compute-api/build.gradle.kts +++ b/opendc-compute/opendc-compute-api/build.gradle.kts @@ -26,7 +26,3 @@ description = "API interface for the OpenDC Compute service" plugins { `kotlin-library-conventions` } - -dependencies { - api(platform(projects.opendcPlatform)) -} diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts index 33cafc45..b9437a73 100644 --- a/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/opendc-compute/opendc-compute-service/build.gradle.kts @@ -30,10 +30,9 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcCompute.opendcComputeApi) api(projects.opendcTelemetry.opendcTelemetryApi) - implementation(projects.opendcUtils) + implementation(projects.opendcCommon) implementation(libs.kotlin.logging) implementation(libs.opentelemetry.semconv) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 27a6ecae..144b6573 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -29,13 +29,13 @@ import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import mu.KotlinLogging +import org.opendc.common.util.Pacer import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.utils.TimerScheduler import java.time.Clock import java.time.Duration import java.util.* @@ -56,7 +56,7 @@ internal class ComputeServiceImpl( private val clock: Clock, meterProvider: MeterProvider, private val scheduler: ComputeScheduler, - private val schedulingQuantum: Duration + schedulingQuantum: Duration ) : ComputeService, HostListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -147,9 +147,9 @@ internal class ComputeServiceImpl( private val _serversActiveAttr = Attributes.of(AttributeKey.stringKey("state"), "active") /** - * The [TimerScheduler] to use for scheduling the scheduler cycles. + * The [Pacer] to use for scheduling the scheduler cycles. */ - private var timerScheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock) + private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis(), ::doSchedule) override val hosts: Set<Host> get() = hostToView.keys @@ -354,28 +354,18 @@ internal class ComputeServiceImpl( * Indicate that a new scheduling cycle is needed due to a change to the service's state. */ private fun requestSchedulingCycle() { - // Bail out in case we have already requested a new cycle or the queue is empty. - if (timerScheduler.isTimerActive(Unit) || queue.isEmpty()) { + // Bail out in case the queue is empty. + if (queue.isEmpty()) { return } - val quantum = schedulingQuantum.toMillis() - - // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). - // This is important because the slices of the VMs need to be aligned. - // We calculate here the delay until the next scheduling slot. - val delay = quantum - (clock.millis() % quantum) - - timerScheduler.startSingleTimer(Unit, delay) { - doSchedule() - } + pacer.enqueue() } /** * Run a single scheduling iteration. */ - private fun doSchedule() { - val now = clock.millis() + private fun doSchedule(now: Long) { while (queue.isNotEmpty()) { val request = queue.peek() diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index aaf69f78..9a8cbfcc 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -30,11 +30,10 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcCompute.opendcComputeService) api(projects.opendcSimulator.opendcSimulatorCompute) api(libs.commons.math3) - implementation(projects.opendcUtils) + implementation(projects.opendcCommon) implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 95921e8b..43f33f27 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -249,6 +249,12 @@ public class SimHost( machine.cancel() } + override fun hashCode(): Int = uid.hashCode() + + override fun equals(other: Any?): Boolean { + return other is SimHost && uid == other.uid + } + override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" public suspend fun fail() { diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index 28a5e1da..93e09b99 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -29,7 +29,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcTrace.opendcTraceApi) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index a1a65da3..4b0b343f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -169,7 +169,7 @@ public class ComputeServiceHelper( optimize = optimize ) - _hosts.add(host) + require(_hosts.add(host)) { "Host with uid ${spec.uid} already exists" } service.addHost(host) return host diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index c20556b5..2def3dc5 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) api(projects.opendcCompute.opendcComputeWorkload) diff --git a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts index 65c31c4f..b96647a6 100644 --- a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts @@ -29,7 +29,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcFaas.opendcFaasService) diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts index 882c4894..43093abf 100644 --- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts @@ -29,16 +29,15 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcUtils) + implementation(projects.opendcCommon) implementation(libs.kotlin.logging) implementation(libs.jackson.module.kotlin) { exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") } - implementation("org.jetbrains.kotlin:kotlin-reflect:1.5.30") + implementation("org.jetbrains.kotlin:kotlin-reflect:1.6.10") } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt index 9771cc20..7d65a674 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt @@ -23,7 +23,7 @@ package org.opendc.experiments.tf20.network import kotlinx.coroutines.channels.Channel -import org.opendc.utils.TimerScheduler +import org.opendc.common.util.TimerScheduler import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -89,6 +89,6 @@ public class NetworkController(context: CoroutineContext, clock: Clock) : AutoCl * Stop the network controller. */ override fun close() { - scheduler.close() + scheduler.cancelAll() } } diff --git a/opendc-faas/opendc-faas-api/build.gradle.kts b/opendc-faas/opendc-faas-api/build.gradle.kts index 7362d949..8a295acd 100644 --- a/opendc-faas/opendc-faas-api/build.gradle.kts +++ b/opendc-faas/opendc-faas-api/build.gradle.kts @@ -26,7 +26,3 @@ description = "API for the OpenDC FaaS platform" plugins { `kotlin-library-conventions` } - -dependencies { - api(platform(projects.opendcPlatform)) -} diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts index 6f4fcc9b..7a561014 100644 --- a/opendc-faas/opendc-faas-service/build.gradle.kts +++ b/opendc-faas/opendc-faas-service/build.gradle.kts @@ -30,10 +30,9 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcFaas.opendcFaasApi) api(projects.opendcTelemetry.opendcTelemetryApi) - implementation(projects.opendcUtils) + implementation(projects.opendcCommon) implementation(libs.kotlin.logging) implementation(libs.opentelemetry.semconv) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt index 63dbadc7..d579ad0c 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt @@ -22,9 +22,9 @@ package org.opendc.faas.service.autoscaler +import org.opendc.common.util.TimerScheduler import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceState -import org.opendc.utils.TimerScheduler import java.time.Clock import java.time.Duration import kotlin.coroutines.CoroutineContext diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index c285585a..1526be9d 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -27,6 +27,7 @@ import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging +import org.opendc.common.util.Pacer import org.opendc.faas.api.FaaSClient import org.opendc.faas.api.FaaSFunction import org.opendc.faas.service.FaaSService @@ -37,7 +38,6 @@ import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceListener import org.opendc.faas.service.deployer.FunctionInstanceState import org.opendc.faas.service.router.RoutingPolicy -import org.opendc.utils.TimerScheduler import java.lang.IllegalStateException import java.time.Clock import java.util.* @@ -55,7 +55,7 @@ import kotlin.coroutines.resumeWithException internal class FaaSServiceImpl( context: CoroutineContext, private val clock: Clock, - private val meterProvider: MeterProvider, + meterProvider: MeterProvider, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, private val terminationPolicy: FunctionTerminationPolicy @@ -76,9 +76,9 @@ internal class FaaSServiceImpl( private val meter = meterProvider.get("org.opendc.faas.service") /** - * The [TimerScheduler] to use for scheduling the scheduler cycles. + * The [Pacer] to use for scheduling the scheduler cycles. */ - private val scheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock) + private val pacer = Pacer(scope.coroutineContext, clock, quantum = 100) { doSchedule() } /** * The [Random] instance used to generate unique identifiers for the objects. @@ -191,19 +191,12 @@ internal class FaaSServiceImpl( * Indicate that a new scheduling cycle is needed due to a change to the service's state. */ private fun schedule() { - // Bail out in case we have already requested a new cycle or the queue is empty. - if (scheduler.isTimerActive(Unit) || queue.isEmpty()) { + // Bail out in case the queue is empty. + if (queue.isEmpty()) { return } - val quantum = 100 - - // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). - // This is important because the slices of the VMs need to be aligned. - // We calculate here the delay until the next scheduling slot. - val delay = quantum - (clock.millis() % quantum) - - scheduler.startSingleTimer(Unit, delay, ::doSchedule) + pacer.enqueue() } /** diff --git a/opendc-faas/opendc-faas-simulator/build.gradle.kts b/opendc-faas/opendc-faas-simulator/build.gradle.kts index fed1862d..50f75429 100644 --- a/opendc-faas/opendc-faas-simulator/build.gradle.kts +++ b/opendc-faas/opendc-faas-simulator/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcFaas.opendcFaasService) api(projects.opendcSimulator.opendcSimulatorCompute) diff --git a/opendc-harness/opendc-harness-api/build.gradle.kts b/opendc-harness/opendc-harness-api/build.gradle.kts index 5c464377..c37f14c2 100644 --- a/opendc-harness/opendc-harness-api/build.gradle.kts +++ b/opendc-harness/opendc-harness-api/build.gradle.kts @@ -28,7 +28,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(libs.junit.platform.commons) implementation(libs.kotlin.logging) diff --git a/opendc-harness/opendc-harness-cli/build.gradle.kts b/opendc-harness/opendc-harness-cli/build.gradle.kts index 5d0c0460..4d0f469f 100644 --- a/opendc-harness/opendc-harness-cli/build.gradle.kts +++ b/opendc-harness/opendc-harness-cli/build.gradle.kts @@ -34,7 +34,6 @@ application { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessEngine) implementation(libs.kotlin.logging) diff --git a/opendc-harness/opendc-harness-engine/build.gradle.kts b/opendc-harness/opendc-harness-engine/build.gradle.kts index cbd10f6a..6bf08b7b 100644 --- a/opendc-harness/opendc-harness-engine/build.gradle.kts +++ b/opendc-harness/opendc-harness-engine/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) api(libs.kotlinx.coroutines) diff --git a/opendc-harness/opendc-harness-junit5/build.gradle.kts b/opendc-harness/opendc-harness-junit5/build.gradle.kts index e63367d3..f78a4c30 100644 --- a/opendc-harness/opendc-harness-junit5/build.gradle.kts +++ b/opendc-harness/opendc-harness-junit5/build.gradle.kts @@ -28,7 +28,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessEngine) implementation(libs.kotlin.logging) diff --git a/opendc-platform/build.gradle.kts b/opendc-platform/build.gradle.kts deleted file mode 100644 index c2cb84a8..00000000 --- a/opendc-platform/build.gradle.kts +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -plugins { - `java-platform` - `publishing-conventions` -} - -description = "Java platform for the OpenDC project" - -publishing.publications.named<MavenPublication>("maven") { - from(components["javaPlatform"]) - - pom { - description.set( - "This Bill of Materials POM can be used to ease dependency management " + - "when referencing multiple OpenDC artifacts using Gradle or Maven." - ) - } -} diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts index ca8b912a..fb516ccf 100644 --- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcSimulator.opendcSimulatorFlow) api(projects.opendcSimulator.opendcSimulatorPower) api(projects.opendcSimulator.opendcSimulatorNetwork) diff --git a/opendc-simulator/opendc-simulator-core/build.gradle.kts b/opendc-simulator/opendc-simulator-core/build.gradle.kts index 6dbf4199..0de96a8e 100644 --- a/opendc-simulator/opendc-simulator-core/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-core/build.gradle.kts @@ -28,6 +28,5 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(libs.kotlinx.coroutines) } diff --git a/opendc-simulator/opendc-simulator-flow/build.gradle.kts b/opendc-simulator/opendc-simulator-flow/build.gradle.kts index f5b67851..3f08071a 100644 --- a/opendc-simulator/opendc-simulator-flow/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-flow/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(libs.kotlinx.coroutines) implementation(libs.kotlin.logging) diff --git a/opendc-simulator/opendc-simulator-network/build.gradle.kts b/opendc-simulator/opendc-simulator-network/build.gradle.kts index f8931053..0cc7763e 100644 --- a/opendc-simulator/opendc-simulator-network/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-network/build.gradle.kts @@ -29,7 +29,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcSimulator.opendcSimulatorFlow) implementation(projects.opendcSimulator.opendcSimulatorCore) diff --git a/opendc-simulator/opendc-simulator-power/build.gradle.kts b/opendc-simulator/opendc-simulator-power/build.gradle.kts index 5d8c8949..59859403 100644 --- a/opendc-simulator/opendc-simulator-power/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-power/build.gradle.kts @@ -29,7 +29,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcSimulator.opendcSimulatorFlow) implementation(projects.opendcSimulator.opendcSimulatorCore) diff --git a/opendc-telemetry/opendc-telemetry-api/build.gradle.kts b/opendc-telemetry/opendc-telemetry-api/build.gradle.kts index 5492fc14..32a36d68 100644 --- a/opendc-telemetry/opendc-telemetry-api/build.gradle.kts +++ b/opendc-telemetry/opendc-telemetry-api/build.gradle.kts @@ -28,6 +28,5 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(libs.opentelemetry.api) } diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts index cd8cb57a..47e30a14 100644 --- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts +++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts @@ -28,7 +28,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTelemetry.opendcTelemetrySdk) implementation(libs.opentelemetry.semconv) diff --git a/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts b/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts index 3b918775..4b3241bc 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts +++ b/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts @@ -28,7 +28,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTelemetry.opendcTelemetryApi) api(libs.kotlinx.coroutines) api(libs.opentelemetry.sdk.main) diff --git a/opendc-trace/opendc-trace-api/build.gradle.kts b/opendc-trace/opendc-trace-api/build.gradle.kts index b2f91593..977eec0d 100644 --- a/opendc-trace/opendc-trace-api/build.gradle.kts +++ b/opendc-trace/opendc-trace-api/build.gradle.kts @@ -26,7 +26,3 @@ description = "Workload trace library for OpenDC" plugins { `kotlin-library-conventions` } - -dependencies { - api(platform(projects.opendcPlatform)) -} diff --git a/opendc-trace/opendc-trace-azure/build.gradle.kts b/opendc-trace/opendc-trace-azure/build.gradle.kts index 8bde56cb..89626737 100644 --- a/opendc-trace/opendc-trace-azure/build.gradle.kts +++ b/opendc-trace/opendc-trace-azure/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTrace.opendcTraceApi) implementation(libs.jackson.dataformat.csv) } diff --git a/opendc-trace/opendc-trace-bitbrains/build.gradle.kts b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts index d195cbbb..71c0e794 100644 --- a/opendc-trace/opendc-trace-bitbrains/build.gradle.kts +++ b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTrace.opendcTraceApi) implementation(libs.jackson.dataformat.csv) } diff --git a/opendc-trace/opendc-trace-gwf/build.gradle.kts b/opendc-trace/opendc-trace-gwf/build.gradle.kts index f3dfd6ef..d02f7a19 100644 --- a/opendc-trace/opendc-trace-gwf/build.gradle.kts +++ b/opendc-trace/opendc-trace-gwf/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTrace.opendcTraceApi) implementation(libs.jackson.dataformat.csv) diff --git a/opendc-trace/opendc-trace-opendc/build.gradle.kts b/opendc-trace/opendc-trace-opendc/build.gradle.kts index b9c242a1..d1b4735e 100644 --- a/opendc-trace/opendc-trace-opendc/build.gradle.kts +++ b/opendc-trace/opendc-trace-opendc/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTrace.opendcTraceApi) implementation(projects.opendcTrace.opendcTraceParquet) diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts index 75378509..f943b6a6 100644 --- a/opendc-trace/opendc-trace-parquet/build.gradle.kts +++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts @@ -30,8 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) - /* This configuration is necessary for a slim dependency on Apache Parquet */ api(libs.parquet) { exclude(group = "org.apache.hadoop") diff --git a/opendc-trace/opendc-trace-swf/build.gradle.kts b/opendc-trace/opendc-trace-swf/build.gradle.kts index c9eaa78d..25d59b49 100644 --- a/opendc-trace/opendc-trace-swf/build.gradle.kts +++ b/opendc-trace/opendc-trace-swf/build.gradle.kts @@ -30,6 +30,5 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTrace.opendcTraceApi) } diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts index 14a0fc7c..0c1e179e 100644 --- a/opendc-trace/opendc-trace-tools/build.gradle.kts +++ b/opendc-trace/opendc-trace-tools/build.gradle.kts @@ -33,8 +33,6 @@ application { } dependencies { - api(platform(projects.opendcPlatform)) - implementation(projects.opendcTrace.opendcTraceApi) implementation(libs.kotlin.logging) implementation(libs.clikt) diff --git a/opendc-trace/opendc-trace-wfformat/build.gradle.kts b/opendc-trace/opendc-trace-wfformat/build.gradle.kts index 2d336d03..cd78fd61 100644 --- a/opendc-trace/opendc-trace-wfformat/build.gradle.kts +++ b/opendc-trace/opendc-trace-wfformat/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTrace.opendcTraceApi) implementation(libs.jackson.core) diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts index e4f0ab3a..8301e363 100644 --- a/opendc-trace/opendc-trace-wtf/build.gradle.kts +++ b/opendc-trace/opendc-trace-wtf/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTrace.opendcTraceApi) implementation(projects.opendcTrace.opendcTraceParquet) diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts index 810f512f..a73ca6b3 100644 --- a/opendc-web/opendc-web-runner/build.gradle.kts +++ b/opendc-web/opendc-web-runner/build.gradle.kts @@ -34,7 +34,6 @@ application { } dependencies { - api(platform(projects.opendcPlatform)) implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcCompute.opendcComputeWorkload) implementation(projects.opendcSimulator.opendcSimulatorCore) diff --git a/opendc-workflow/opendc-workflow-api/build.gradle.kts b/opendc-workflow/opendc-workflow-api/build.gradle.kts index 36239d05..03569d8c 100644 --- a/opendc-workflow/opendc-workflow-api/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-api/build.gradle.kts @@ -28,7 +28,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcCompute.opendcComputeApi) implementation(libs.kotlin.logging) } diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 4d8b7d7f..17df33e3 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -30,11 +30,10 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcWorkflow.opendcWorkflowApi) api(projects.opendcCompute.opendcComputeApi) api(projects.opendcTelemetry.opendcTelemetryApi) - implementation(projects.opendcUtils) + implementation(projects.opendcCommon) implementation(libs.kotlin.logging) testImplementation(projects.opendcWorkflow.opendcWorkflowWorkload) diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 7b6d8651..cdaec021 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -25,8 +25,8 @@ package org.opendc.workflow.service.internal import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* +import org.opendc.common.util.Pacer import org.opendc.compute.api.* -import org.opendc.utils.TimerScheduler import org.opendc.workflow.api.Job import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.service.* @@ -187,9 +187,9 @@ public class WorkflowServiceImpl( .build() /** - * The [TimerScheduler] to use for scheduling the scheduler cycles. + * The [Pacer] to use for scheduling the scheduler cycles. */ - private val timerScheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock) + private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic @@ -230,7 +230,7 @@ public class WorkflowServiceImpl( rootListener.jobSubmitted(jobInstance) submittedJobs.add(1) - requestSchedulingCycle() + pacer.enqueue() } override fun close() { @@ -252,31 +252,6 @@ public class WorkflowServiceImpl( } /** - * Indicate that a new scheduling cycle is needed due to a change to the service's state. - */ - private fun requestSchedulingCycle() { - // Bail out in case we have already requested a new cycle or the queue is empty. - if (timerScheduler.isTimerActive(Unit)) { - return - } - - val quantum = schedulingQuantum.toMillis() - if (quantum == 0L) { - doSchedule() - return - } - - // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). - // This is important because the slices of the VMs need to be aligned. - // We calculate here the delay until the next scheduling slot. - val delay = quantum - (clock.millis() % quantum) - - timerScheduler.startSingleTimer(Unit, delay) { - doSchedule() - } - } - - /** * Perform a scheduling cycle immediately. */ private fun doSchedule() { @@ -409,10 +384,9 @@ public class WorkflowServiceImpl( finishJob(job) } - requestSchedulingCycle() - } - ServerState.DELETED -> { + pacer.enqueue() } + ServerState.DELETED -> {} else -> throw IllegalStateException() } } diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts index dfb77a39..a9d497af 100644 --- a/opendc-workflow/opendc-workflow-workload/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-workload/build.gradle.kts @@ -29,7 +29,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcWorkflow.opendcWorkflowService) implementation(projects.opendcSimulator.opendcSimulatorCompute) diff --git a/settings.gradle.kts b/settings.gradle.kts index 170a687d..f518a984 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -21,7 +21,7 @@ */ rootProject.name = "opendc" -include(":opendc-platform") +include(":opendc-common") include(":opendc-compute:opendc-compute-api") include(":opendc-compute:opendc-compute-service") include(":opendc-compute:opendc-compute-simulator") @@ -60,7 +60,6 @@ include(":opendc-harness:opendc-harness-api") include(":opendc-harness:opendc-harness-engine") include(":opendc-harness:opendc-harness-cli") include(":opendc-harness:opendc-harness-junit5") -include(":opendc-utils") enableFeaturePreview("VERSION_CATALOGS") enableFeaturePreview("TYPESAFE_PROJECT_ACCESSORS") |
