From 028960fbf584c903156c713447194df56ec5059e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 18 Feb 2022 12:35:11 +0100 Subject: feat(utils): Add Pacer to pace scheduling cycles This change adds a new Pacer class that can pace the incoming scheduling requests into scheduling cycles by allowing the user to specify a scheduling quantum. --- .../compute/service/internal/ComputeServiceImpl.kt | 26 ++--- .../faas/service/internal/FaaSServiceImpl.kt | 21 ++-- opendc-utils/build.gradle.kts | 1 + .../src/main/kotlin/org/opendc/utils/Pacer.kt | 92 +++++++++++++++ .../src/test/kotlin/org/opendc/utils/PacerTest.kt | 127 +++++++++++++++++++++ .../service/internal/WorkflowServiceImpl.kt | 38 +----- 6 files changed, 241 insertions(+), 64 deletions(-) create mode 100644 opendc-utils/src/main/kotlin/org/opendc/utils/Pacer.kt create mode 100644 opendc-utils/src/test/kotlin/org/opendc/utils/PacerTest.kt diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 27a6ecae..6df3c4f9 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -35,7 +35,7 @@ import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.utils.TimerScheduler +import org.opendc.utils.Pacer import java.time.Clock import java.time.Duration import java.util.* @@ -56,7 +56,7 @@ internal class ComputeServiceImpl( private val clock: Clock, meterProvider: MeterProvider, private val scheduler: ComputeScheduler, - private val schedulingQuantum: Duration + schedulingQuantum: Duration ) : ComputeService, HostListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -147,9 +147,9 @@ internal class ComputeServiceImpl( private val _serversActiveAttr = Attributes.of(AttributeKey.stringKey("state"), "active") /** - * The [TimerScheduler] to use for scheduling the scheduler cycles. + * The [Pacer] to use for scheduling the scheduler cycles. */ - private var timerScheduler: TimerScheduler = TimerScheduler(scope.coroutineContext, clock) + private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis(), ::doSchedule) override val hosts: Set get() = hostToView.keys @@ -354,28 +354,18 @@ internal class ComputeServiceImpl( * Indicate that a new scheduling cycle is needed due to a change to the service's state. */ private fun requestSchedulingCycle() { - // Bail out in case we have already requested a new cycle or the queue is empty. - if (timerScheduler.isTimerActive(Unit) || queue.isEmpty()) { + // Bail out in case the queue is empty. + if (queue.isEmpty()) { return } - val quantum = schedulingQuantum.toMillis() - - // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). - // This is important because the slices of the VMs need to be aligned. - // We calculate here the delay until the next scheduling slot. - val delay = quantum - (clock.millis() % quantum) - - timerScheduler.startSingleTimer(Unit, delay) { - doSchedule() - } + pacer.enqueue() } /** * Run a single scheduling iteration. */ - private fun doSchedule() { - val now = clock.millis() + private fun doSchedule(now: Long) { while (queue.isNotEmpty()) { val request = queue.peek() diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index c285585a..82032718 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -37,7 +37,7 @@ import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceListener import org.opendc.faas.service.deployer.FunctionInstanceState import org.opendc.faas.service.router.RoutingPolicy -import org.opendc.utils.TimerScheduler +import org.opendc.utils.Pacer import java.lang.IllegalStateException import java.time.Clock import java.util.* @@ -55,7 +55,7 @@ import kotlin.coroutines.resumeWithException internal class FaaSServiceImpl( context: CoroutineContext, private val clock: Clock, - private val meterProvider: MeterProvider, + meterProvider: MeterProvider, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, private val terminationPolicy: FunctionTerminationPolicy @@ -76,9 +76,9 @@ internal class FaaSServiceImpl( private val meter = meterProvider.get("org.opendc.faas.service") /** - * The [TimerScheduler] to use for scheduling the scheduler cycles. + * The [Pacer] to use for scheduling the scheduler cycles. */ - private val scheduler: TimerScheduler = TimerScheduler(scope.coroutineContext, clock) + private val pacer = Pacer(scope.coroutineContext, clock, quantum = 100) { doSchedule() } /** * The [Random] instance used to generate unique identifiers for the objects. @@ -191,19 +191,12 @@ internal class FaaSServiceImpl( * Indicate that a new scheduling cycle is needed due to a change to the service's state. */ private fun schedule() { - // Bail out in case we have already requested a new cycle or the queue is empty. - if (scheduler.isTimerActive(Unit) || queue.isEmpty()) { + // Bail out in case the queue is empty. + if (queue.isEmpty()) { return } - val quantum = 100 - - // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). - // This is important because the slices of the VMs need to be aligned. - // We calculate here the delay until the next scheduling slot. - val delay = quantum - (clock.millis() % quantum) - - scheduler.startSingleTimer(Unit, delay, ::doSchedule) + pacer.enqueue() } /** diff --git a/opendc-utils/build.gradle.kts b/opendc-utils/build.gradle.kts index 800b374d..858aa64c 100644 --- a/opendc-utils/build.gradle.kts +++ b/opendc-utils/build.gradle.kts @@ -26,6 +26,7 @@ description = "Utilities used across OpenDC modules" plugins { `kotlin-library-conventions` `testing-conventions` + `jacoco-conventions` } dependencies { diff --git a/opendc-utils/src/main/kotlin/org/opendc/utils/Pacer.kt b/opendc-utils/src/main/kotlin/org/opendc/utils/Pacer.kt new file mode 100644 index 00000000..07d3cb87 --- /dev/null +++ b/opendc-utils/src/main/kotlin/org/opendc/utils/Pacer.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.utils + +import kotlinx.coroutines.* +import java.lang.Runnable +import java.time.Clock +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext + +/** + * Helper class to pace the incoming scheduling requests. + * + * @param context The [CoroutineContext] in which the pacer runs. + * @param clock The virtual simulation clock. + * @param quantum The scheduling quantum. + * @param process The process to invoke for the incoming requests. + */ +public class Pacer( + private val context: CoroutineContext, + private val clock: Clock, + private val quantum: Long, + private val process: (Long) -> Unit +) { + /** + * The [Delay] instance that provides scheduled execution of [Runnable]s. + */ + @OptIn(InternalCoroutinesApi::class) + private val delay = + requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } + + /** + * The current [DisposableHandle] representing the pending scheduling cycle. + */ + private var handle: DisposableHandle? = null + + /** + * Determine whether a scheduling cycle is pending. + */ + public val isPending: Boolean get() = handle != null + + /** + * Enqueue a new scheduling cycle. + */ + public fun enqueue() { + if (handle != null) { + return + } + + val quantum = quantum + val now = clock.millis() + + // We assume that the scheduler runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). + // We calculate here the delay until the next scheduling slot. + val timeUntilNextSlot = quantum - (now % quantum) + + @OptIn(InternalCoroutinesApi::class) + handle = delay.invokeOnTimeout(timeUntilNextSlot, { + process(now + timeUntilNextSlot) + handle = null + }, context) + } + + /** + * Cancel the currently pending scheduling cycle. + */ + public fun cancel() { + val handle = handle ?: return + this.handle = null + handle.dispose() + } +} diff --git a/opendc-utils/src/test/kotlin/org/opendc/utils/PacerTest.kt b/opendc-utils/src/test/kotlin/org/opendc/utils/PacerTest.kt new file mode 100644 index 00000000..b8419e80 --- /dev/null +++ b/opendc-utils/src/test/kotlin/org/opendc/utils/PacerTest.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.utils + +import kotlinx.coroutines.delay +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import java.time.Clock +import kotlin.coroutines.EmptyCoroutineContext + +/** + * Test suite for the [Pacer] class. + */ +class PacerTest { + @Test + fun testEmptyContext() { + assertThrows { Pacer(EmptyCoroutineContext, Clock.systemUTC(), 100) {} } + } + + @Test + fun testSingleEnqueue() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + } + + assertEquals(1, count) { "Process should execute once" } + } + + @Test + fun testCascade() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + pacer.enqueue() + + assertTrue(pacer.isPending) + } + + assertEquals(1, count) { "Process should execute once" } + } + + @Test + fun testCancel() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + pacer.cancel() + + assertFalse(pacer.isPending) + } + + assertEquals(0, count) { "Process should never execute " } + } + + @Test + fun testCancelWithoutPending() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + assertFalse(pacer.isPending) + assertDoesNotThrow { pacer.cancel() } + + pacer.enqueue() + } + + assertEquals(1, count) { "Process should execute once" } + } + + @Test + fun testSubsequent() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + delay(100) + pacer.enqueue() + } + + assertEquals(2, count) { "Process should execute twice" } + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 7b6d8651..cb4bfd45 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -26,7 +26,7 @@ import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import org.opendc.compute.api.* -import org.opendc.utils.TimerScheduler +import org.opendc.utils.Pacer import org.opendc.workflow.api.Job import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.service.* @@ -187,9 +187,9 @@ public class WorkflowServiceImpl( .build() /** - * The [TimerScheduler] to use for scheduling the scheduler cycles. + * The [Pacer] to use for scheduling the scheduler cycles. */ - private val timerScheduler: TimerScheduler = TimerScheduler(scope.coroutineContext, clock) + private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic @@ -230,7 +230,7 @@ public class WorkflowServiceImpl( rootListener.jobSubmitted(jobInstance) submittedJobs.add(1) - requestSchedulingCycle() + pacer.enqueue() } override fun close() { @@ -251,31 +251,6 @@ public class WorkflowServiceImpl( rootListener.listeners -= listener } - /** - * Indicate that a new scheduling cycle is needed due to a change to the service's state. - */ - private fun requestSchedulingCycle() { - // Bail out in case we have already requested a new cycle or the queue is empty. - if (timerScheduler.isTimerActive(Unit)) { - return - } - - val quantum = schedulingQuantum.toMillis() - if (quantum == 0L) { - doSchedule() - return - } - - // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). - // This is important because the slices of the VMs need to be aligned. - // We calculate here the delay until the next scheduling slot. - val delay = quantum - (clock.millis() % quantum) - - timerScheduler.startSingleTimer(Unit, delay) { - doSchedule() - } - } - /** * Perform a scheduling cycle immediately. */ @@ -409,10 +384,9 @@ public class WorkflowServiceImpl( finishJob(job) } - requestSchedulingCycle() - } - ServerState.DELETED -> { + pacer.enqueue() } + ServerState.DELETED -> {} else -> throw IllegalStateException() } } -- cgit v1.2.3 From 52d35cd82905612f0ef9f7b7d88611300fb48ebe Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 18 Feb 2022 14:08:18 +0100 Subject: refactor(utils): Rename utils module to common module This change adds a new module, opendc-common, that contains functionality that is shared across OpenDC's modules. We move the existing utils module into this new module. --- opendc-common/build.gradle.kts | 37 ++++ .../main/kotlin/org/opendc/common/util/Pacer.kt | 92 ++++++++ .../org/opendc/common/util/TimerScheduler.kt | 239 +++++++++++++++++++++ .../kotlin/org/opendc/common/util/PacerTest.kt | 127 +++++++++++ .../org/opendc/common/util/TimerSchedulerTest.kt | 139 ++++++++++++ .../opendc-compute-service/build.gradle.kts | 2 +- .../compute/service/internal/ComputeServiceImpl.kt | 2 +- .../opendc-compute-simulator/build.gradle.kts | 2 +- .../opendc-experiments-tf20/build.gradle.kts | 4 +- opendc-faas/opendc-faas-service/build.gradle.kts | 2 +- .../autoscaler/FunctionTerminationPolicyFixed.kt | 2 +- .../faas/service/internal/FaaSServiceImpl.kt | 2 +- opendc-utils/build.gradle.kts | 37 ---- .../src/main/kotlin/org/opendc/utils/Pacer.kt | 92 -------- .../main/kotlin/org/opendc/utils/TimerScheduler.kt | 239 --------------------- .../src/test/kotlin/org/opendc/utils/PacerTest.kt | 127 ----------- .../kotlin/org/opendc/utils/TimerSchedulerTest.kt | 139 ------------ .../opendc-workflow-service/build.gradle.kts | 2 +- .../service/internal/WorkflowServiceImpl.kt | 2 +- settings.gradle.kts | 2 +- 20 files changed, 645 insertions(+), 645 deletions(-) create mode 100644 opendc-common/build.gradle.kts create mode 100644 opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt create mode 100644 opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt create mode 100644 opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt create mode 100644 opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt delete mode 100644 opendc-utils/build.gradle.kts delete mode 100644 opendc-utils/src/main/kotlin/org/opendc/utils/Pacer.kt delete mode 100644 opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt delete mode 100644 opendc-utils/src/test/kotlin/org/opendc/utils/PacerTest.kt delete mode 100644 opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt diff --git a/opendc-common/build.gradle.kts b/opendc-common/build.gradle.kts new file mode 100644 index 00000000..67441e93 --- /dev/null +++ b/opendc-common/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Common functionality used across OpenDC modules" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(libs.kotlinx.coroutines) + + testImplementation(projects.opendcSimulator.opendcSimulatorCore) +} diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt b/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt new file mode 100644 index 00000000..8ccff6c3 --- /dev/null +++ b/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.common.util + +import kotlinx.coroutines.* +import java.lang.Runnable +import java.time.Clock +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext + +/** + * Helper class to pace the incoming scheduling requests. + * + * @param context The [CoroutineContext] in which the pacer runs. + * @param clock The virtual simulation clock. + * @param quantum The scheduling quantum. + * @param process The process to invoke for the incoming requests. + */ +public class Pacer( + private val context: CoroutineContext, + private val clock: Clock, + private val quantum: Long, + private val process: (Long) -> Unit +) { + /** + * The [Delay] instance that provides scheduled execution of [Runnable]s. + */ + @OptIn(InternalCoroutinesApi::class) + private val delay = + requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } + + /** + * The current [DisposableHandle] representing the pending scheduling cycle. + */ + private var handle: DisposableHandle? = null + + /** + * Determine whether a scheduling cycle is pending. + */ + public val isPending: Boolean get() = handle != null + + /** + * Enqueue a new scheduling cycle. + */ + public fun enqueue() { + if (handle != null) { + return + } + + val quantum = quantum + val now = clock.millis() + + // We assume that the scheduler runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). + // We calculate here the delay until the next scheduling slot. + val timeUntilNextSlot = quantum - (now % quantum) + + @OptIn(InternalCoroutinesApi::class) + handle = delay.invokeOnTimeout(timeUntilNextSlot, { + process(now + timeUntilNextSlot) + handle = null + }, context) + } + + /** + * Cancel the currently pending scheduling cycle. + */ + public fun cancel() { + val handle = handle ?: return + this.handle = null + handle.dispose() + } +} diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt new file mode 100644 index 00000000..86314411 --- /dev/null +++ b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt @@ -0,0 +1,239 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.common.util + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.selects.select +import java.time.Clock +import java.util.* +import kotlin.coroutines.CoroutineContext +import kotlin.math.max + +/** + * A TimerScheduler facilitates scheduled execution of future tasks. + * + * @param context The [CoroutineContext] to run the tasks with. + * @param clock The clock to keep track of the time. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public class TimerScheduler(context: CoroutineContext, private val clock: Clock) : AutoCloseable { + /** + * The scope in which the scheduler runs. + */ + private val scope = CoroutineScope(context + Job()) + + /** + * A priority queue containing the tasks to be scheduled in the future. + */ + private val queue = PriorityQueue() + + /** + * A map that keeps track of the timers. + */ + private val timers = mutableMapOf() + + /** + * The channel to communicate with the scheduling job. + */ + private val channel = Channel(Channel.CONFLATED) + + /** + * A flag to indicate that the scheduler is active. + */ + private var isActive = true + + init { + scope.launch { + val timers = timers + val queue = queue + val clock = clock + val job = requireNotNull(coroutineContext[Job]) + val exceptionHandler = coroutineContext[CoroutineExceptionHandler] + var next: Long? = channel.receive() + + while (true) { + next = select { + channel.onReceive { it } + + val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select + + onTimeout(delay) { + while (queue.isNotEmpty() && job.isActive) { + val timer = queue.peek() + val timestamp = clock.millis() + + assert(timer.timestamp >= timestamp) { "Found task in the past" } + + if (timer.timestamp > timestamp && !timer.isCancelled) { + // Schedule a task for the next event to occur. + return@onTimeout timer.timestamp + } + + queue.poll() + + if (!timer.isCancelled) { + timers.remove(timer.key) + try { + timer() + } catch (e: Throwable) { + exceptionHandler?.handleException(coroutineContext, e) + } + } + } + + null + } + } + } + } + } + + /** + * Stop the scheduler. + */ + override fun close() { + isActive = false + cancelAll() + scope.cancel() + } + + /** + * Cancel a timer with a given key. + * + * If canceling a timer that was already canceled, or key never was used to start + * a timer this operation will do nothing. + * + * @param key The key of the timer to cancel. + */ + public fun cancel(key: T) { + if (!isActive) { + return + } + + val timer = timers.remove(key) + + // Mark the timer as cancelled + timer?.isCancelled = true + + // Optimization: check whether we are the head of the queue + val queue = queue + val channel = channel + val peek = queue.peek() + if (peek == timer) { + queue.poll() + + if (queue.isNotEmpty()) { + channel.trySend(peek.timestamp) + } else { + channel.trySend(null) + } + } + } + + /** + * Cancel all timers. + */ + public fun cancelAll() { + queue.clear() + timers.clear() + } + + /** + * Check if a timer with a given key is active. + * + * @param key The key to check if active. + * @return `true` if the timer with the specified [key] is active, `false` otherwise. + */ + public fun isTimerActive(key: T): Boolean = key in timers + + /** + * Start a timer that will invoke the specified [block] after [delay]. + * + * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * + * @param key The key of the timer to start. + * @param delay The delay before invoking the block. + * @param block The block to invoke. + */ + public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) { + startSingleTimerTo(key, clock.millis() + delay, block) + } + + /** + * Start a timer that will invoke the specified [block] at [timestamp]. + * + * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * + * @param key The key of the timer to start. + * @param timestamp The timestamp at which to invoke the block. + * @param block The block to invoke. + */ + public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) { + val now = clock.millis() + val queue = queue + val channel = channel + + require(timestamp >= now) { "Timestamp must be in the future" } + check(isActive) { "Timer is stopped" } + + timers.compute(key) { _, old -> + if (old != null && old.timestamp == timestamp) { + // Fast-path: timer for the same timestamp already exists + old + } else { + // Slow-path: cancel old timer and replace it with new timer + val timer = Timer(key, timestamp, block) + + old?.isCancelled = true + queue.add(timer) + + // Check if we need to push the interruption forward + // Note that we check by timer reference + if (queue.peek() === timer) { + channel.trySend(timer.timestamp) + } + + timer + } + } + } + + /** + * A task that is scheduled to run in the future. + */ + private inner class Timer(val key: T, val timestamp: Long, val block: () -> Unit) : Comparable { + /** + * A flag to indicate that the task has been cancelled. + */ + @JvmField + var isCancelled: Boolean = false + + /** + * Run the task. + */ + operator fun invoke(): Unit = block() + + override fun compareTo(other: Timer): Int = timestamp.compareTo(other.timestamp) + } +} diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt new file mode 100644 index 00000000..1cd435f6 --- /dev/null +++ b/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.common.util + +import kotlinx.coroutines.delay +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import java.time.Clock +import kotlin.coroutines.EmptyCoroutineContext + +/** + * Test suite for the [Pacer] class. + */ +class PacerTest { + @Test + fun testEmptyContext() { + assertThrows { Pacer(EmptyCoroutineContext, Clock.systemUTC(), 100) {} } + } + + @Test + fun testSingleEnqueue() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + } + + assertEquals(1, count) { "Process should execute once" } + } + + @Test + fun testCascade() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + pacer.enqueue() + + assertTrue(pacer.isPending) + } + + assertEquals(1, count) { "Process should execute once" } + } + + @Test + fun testCancel() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + pacer.cancel() + + assertFalse(pacer.isPending) + } + + assertEquals(0, count) { "Process should never execute " } + } + + @Test + fun testCancelWithoutPending() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + assertFalse(pacer.isPending) + assertDoesNotThrow { pacer.cancel() } + + pacer.enqueue() + } + + assertEquals(1, count) { "Process should execute once" } + } + + @Test + fun testSubsequent() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + delay(100) + pacer.enqueue() + } + + assertEquals(2, count) { "Process should execute twice" } + } +} diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt new file mode 100644 index 00000000..89b0cbe9 --- /dev/null +++ b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.common.util + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation + +/** + * A test suite for the [TimerScheduler] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class TimerSchedulerTest { + @Test + fun testBasicTimer() { + runBlockingSimulation { + val scheduler = TimerScheduler(coroutineContext, clock) + + scheduler.startSingleTimer(0, 1000) { + scheduler.close() + assertEquals(1000, clock.millis()) + } + } + } + + @Test + fun testCancelNonExisting() { + runBlockingSimulation { + val scheduler = TimerScheduler(coroutineContext, clock) + + scheduler.cancel(1) + scheduler.close() + } + } + + @Test + fun testCancelExisting() { + runBlockingSimulation { + val scheduler = TimerScheduler(coroutineContext, clock) + + scheduler.startSingleTimer(0, 1000) { + assertFalse(false) + } + + scheduler.startSingleTimer(1, 100) { + scheduler.cancel(0) + scheduler.close() + + assertEquals(100, clock.millis()) + } + } + } + + @Test + fun testCancelAll() { + runBlockingSimulation { + val scheduler = TimerScheduler(coroutineContext, clock) + + scheduler.startSingleTimer(0, 1000) { + assertFalse(false) + } + + scheduler.startSingleTimer(1, 100) { + assertFalse(false) + } + + scheduler.close() + } + } + + @Test + fun testOverride() { + runBlockingSimulation { + val scheduler = TimerScheduler(coroutineContext, clock) + + scheduler.startSingleTimer(0, 1000) { + assertFalse(false) + } + + scheduler.startSingleTimer(0, 200) { + scheduler.close() + + assertEquals(200, clock.millis()) + } + } + } + + @Test + fun testStopped() { + runBlockingSimulation { + val scheduler = TimerScheduler(coroutineContext, clock) + + scheduler.close() + + assertThrows { + scheduler.startSingleTimer(1, 100) { + assertFalse(false) + } + } + } + } + + @Test + fun testNegativeDelay() { + runBlockingSimulation { + val scheduler = TimerScheduler(coroutineContext, clock) + + assertThrows { + scheduler.startSingleTimer(1, -1) { + assertFalse(false) + } + } + + scheduler.close() + } + } +} diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts index 33cafc45..b609b147 100644 --- a/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/opendc-compute/opendc-compute-service/build.gradle.kts @@ -33,7 +33,7 @@ dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcCompute.opendcComputeApi) api(projects.opendcTelemetry.opendcTelemetryApi) - implementation(projects.opendcUtils) + implementation(projects.opendcCommon) implementation(libs.kotlin.logging) implementation(libs.opentelemetry.semconv) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 6df3c4f9..144b6573 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -29,13 +29,13 @@ import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import mu.KotlinLogging +import org.opendc.common.util.Pacer import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.utils.Pacer import java.time.Clock import java.time.Duration import java.util.* diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index aaf69f78..686d9ca1 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -34,7 +34,7 @@ dependencies { api(projects.opendcCompute.opendcComputeService) api(projects.opendcSimulator.opendcSimulatorCompute) api(libs.commons.math3) - implementation(projects.opendcUtils) + implementation(projects.opendcCommon) implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts index 882c4894..75381e1f 100644 --- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts @@ -34,11 +34,11 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcUtils) + implementation(projects.opendcCommon) implementation(libs.kotlin.logging) implementation(libs.jackson.module.kotlin) { exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") } - implementation("org.jetbrains.kotlin:kotlin-reflect:1.5.30") + implementation("org.jetbrains.kotlin:kotlin-reflect:1.6.10") } diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts index 6f4fcc9b..95b171b9 100644 --- a/opendc-faas/opendc-faas-service/build.gradle.kts +++ b/opendc-faas/opendc-faas-service/build.gradle.kts @@ -33,7 +33,7 @@ dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcFaas.opendcFaasApi) api(projects.opendcTelemetry.opendcTelemetryApi) - implementation(projects.opendcUtils) + implementation(projects.opendcCommon) implementation(libs.kotlin.logging) implementation(libs.opentelemetry.semconv) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt index 63dbadc7..d579ad0c 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt @@ -22,9 +22,9 @@ package org.opendc.faas.service.autoscaler +import org.opendc.common.util.TimerScheduler import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceState -import org.opendc.utils.TimerScheduler import java.time.Clock import java.time.Duration import kotlin.coroutines.CoroutineContext diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index 82032718..1526be9d 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -27,6 +27,7 @@ import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging +import org.opendc.common.util.Pacer import org.opendc.faas.api.FaaSClient import org.opendc.faas.api.FaaSFunction import org.opendc.faas.service.FaaSService @@ -37,7 +38,6 @@ import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceListener import org.opendc.faas.service.deployer.FunctionInstanceState import org.opendc.faas.service.router.RoutingPolicy -import org.opendc.utils.Pacer import java.lang.IllegalStateException import java.time.Clock import java.util.* diff --git a/opendc-utils/build.gradle.kts b/opendc-utils/build.gradle.kts deleted file mode 100644 index 858aa64c..00000000 --- a/opendc-utils/build.gradle.kts +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Utilities used across OpenDC modules" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` - `testing-conventions` - `jacoco-conventions` -} - -dependencies { - api(platform(projects.opendcPlatform)) - api(libs.kotlinx.coroutines) - - testImplementation(projects.opendcSimulator.opendcSimulatorCore) -} diff --git a/opendc-utils/src/main/kotlin/org/opendc/utils/Pacer.kt b/opendc-utils/src/main/kotlin/org/opendc/utils/Pacer.kt deleted file mode 100644 index 07d3cb87..00000000 --- a/opendc-utils/src/main/kotlin/org/opendc/utils/Pacer.kt +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.utils - -import kotlinx.coroutines.* -import java.lang.Runnable -import java.time.Clock -import kotlin.coroutines.ContinuationInterceptor -import kotlin.coroutines.CoroutineContext - -/** - * Helper class to pace the incoming scheduling requests. - * - * @param context The [CoroutineContext] in which the pacer runs. - * @param clock The virtual simulation clock. - * @param quantum The scheduling quantum. - * @param process The process to invoke for the incoming requests. - */ -public class Pacer( - private val context: CoroutineContext, - private val clock: Clock, - private val quantum: Long, - private val process: (Long) -> Unit -) { - /** - * The [Delay] instance that provides scheduled execution of [Runnable]s. - */ - @OptIn(InternalCoroutinesApi::class) - private val delay = - requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } - - /** - * The current [DisposableHandle] representing the pending scheduling cycle. - */ - private var handle: DisposableHandle? = null - - /** - * Determine whether a scheduling cycle is pending. - */ - public val isPending: Boolean get() = handle != null - - /** - * Enqueue a new scheduling cycle. - */ - public fun enqueue() { - if (handle != null) { - return - } - - val quantum = quantum - val now = clock.millis() - - // We assume that the scheduler runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). - // We calculate here the delay until the next scheduling slot. - val timeUntilNextSlot = quantum - (now % quantum) - - @OptIn(InternalCoroutinesApi::class) - handle = delay.invokeOnTimeout(timeUntilNextSlot, { - process(now + timeUntilNextSlot) - handle = null - }, context) - } - - /** - * Cancel the currently pending scheduling cycle. - */ - public fun cancel() { - val handle = handle ?: return - this.handle = null - handle.dispose() - } -} diff --git a/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt deleted file mode 100644 index d7da7f99..00000000 --- a/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.utils - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.selects.select -import java.time.Clock -import java.util.* -import kotlin.coroutines.CoroutineContext -import kotlin.math.max - -/** - * A TimerScheduler facilitates scheduled execution of future tasks. - * - * @param context The [CoroutineContext] to run the tasks with. - * @param clock The clock to keep track of the time. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public class TimerScheduler(context: CoroutineContext, private val clock: Clock) : AutoCloseable { - /** - * The scope in which the scheduler runs. - */ - private val scope = CoroutineScope(context + Job()) - - /** - * A priority queue containing the tasks to be scheduled in the future. - */ - private val queue = PriorityQueue() - - /** - * A map that keeps track of the timers. - */ - private val timers = mutableMapOf() - - /** - * The channel to communicate with the scheduling job. - */ - private val channel = Channel(Channel.CONFLATED) - - /** - * A flag to indicate that the scheduler is active. - */ - private var isActive = true - - init { - scope.launch { - val timers = timers - val queue = queue - val clock = clock - val job = requireNotNull(coroutineContext[Job]) - val exceptionHandler = coroutineContext[CoroutineExceptionHandler] - var next: Long? = channel.receive() - - while (true) { - next = select { - channel.onReceive { it } - - val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select - - onTimeout(delay) { - while (queue.isNotEmpty() && job.isActive) { - val timer = queue.peek() - val timestamp = clock.millis() - - assert(timer.timestamp >= timestamp) { "Found task in the past" } - - if (timer.timestamp > timestamp && !timer.isCancelled) { - // Schedule a task for the next event to occur. - return@onTimeout timer.timestamp - } - - queue.poll() - - if (!timer.isCancelled) { - timers.remove(timer.key) - try { - timer() - } catch (e: Throwable) { - exceptionHandler?.handleException(coroutineContext, e) - } - } - } - - null - } - } - } - } - } - - /** - * Stop the scheduler. - */ - override fun close() { - isActive = false - cancelAll() - scope.cancel() - } - - /** - * Cancel a timer with a given key. - * - * If canceling a timer that was already canceled, or key never was used to start - * a timer this operation will do nothing. - * - * @param key The key of the timer to cancel. - */ - public fun cancel(key: T) { - if (!isActive) { - return - } - - val timer = timers.remove(key) - - // Mark the timer as cancelled - timer?.isCancelled = true - - // Optimization: check whether we are the head of the queue - val queue = queue - val channel = channel - val peek = queue.peek() - if (peek == timer) { - queue.poll() - - if (queue.isNotEmpty()) { - channel.trySend(peek.timestamp) - } else { - channel.trySend(null) - } - } - } - - /** - * Cancel all timers. - */ - public fun cancelAll() { - queue.clear() - timers.clear() - } - - /** - * Check if a timer with a given key is active. - * - * @param key The key to check if active. - * @return `true` if the timer with the specified [key] is active, `false` otherwise. - */ - public fun isTimerActive(key: T): Boolean = key in timers - - /** - * Start a timer that will invoke the specified [block] after [delay]. - * - * Each timer has a key and if a new timer with same key is started the previous is cancelled. - * - * @param key The key of the timer to start. - * @param delay The delay before invoking the block. - * @param block The block to invoke. - */ - public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) { - startSingleTimerTo(key, clock.millis() + delay, block) - } - - /** - * Start a timer that will invoke the specified [block] at [timestamp]. - * - * Each timer has a key and if a new timer with same key is started the previous is cancelled. - * - * @param key The key of the timer to start. - * @param timestamp The timestamp at which to invoke the block. - * @param block The block to invoke. - */ - public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) { - val now = clock.millis() - val queue = queue - val channel = channel - - require(timestamp >= now) { "Timestamp must be in the future" } - check(isActive) { "Timer is stopped" } - - timers.compute(key) { _, old -> - if (old != null && old.timestamp == timestamp) { - // Fast-path: timer for the same timestamp already exists - old - } else { - // Slow-path: cancel old timer and replace it with new timer - val timer = Timer(key, timestamp, block) - - old?.isCancelled = true - queue.add(timer) - - // Check if we need to push the interruption forward - // Note that we check by timer reference - if (queue.peek() === timer) { - channel.trySend(timer.timestamp) - } - - timer - } - } - } - - /** - * A task that is scheduled to run in the future. - */ - private inner class Timer(val key: T, val timestamp: Long, val block: () -> Unit) : Comparable { - /** - * A flag to indicate that the task has been cancelled. - */ - @JvmField - var isCancelled: Boolean = false - - /** - * Run the task. - */ - operator fun invoke(): Unit = block() - - override fun compareTo(other: Timer): Int = timestamp.compareTo(other.timestamp) - } -} diff --git a/opendc-utils/src/test/kotlin/org/opendc/utils/PacerTest.kt b/opendc-utils/src/test/kotlin/org/opendc/utils/PacerTest.kt deleted file mode 100644 index b8419e80..00000000 --- a/opendc-utils/src/test/kotlin/org/opendc/utils/PacerTest.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.utils - -import kotlinx.coroutines.delay -import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.core.runBlockingSimulation -import java.time.Clock -import kotlin.coroutines.EmptyCoroutineContext - -/** - * Test suite for the [Pacer] class. - */ -class PacerTest { - @Test - fun testEmptyContext() { - assertThrows { Pacer(EmptyCoroutineContext, Clock.systemUTC(), 100) {} } - } - - @Test - fun testSingleEnqueue() { - var count = 0 - - runBlockingSimulation { - val pacer = Pacer(coroutineContext, clock, quantum = 100) { - count++ - } - - pacer.enqueue() - } - - assertEquals(1, count) { "Process should execute once" } - } - - @Test - fun testCascade() { - var count = 0 - - runBlockingSimulation { - val pacer = Pacer(coroutineContext, clock, quantum = 100) { - count++ - } - - pacer.enqueue() - pacer.enqueue() - - assertTrue(pacer.isPending) - } - - assertEquals(1, count) { "Process should execute once" } - } - - @Test - fun testCancel() { - var count = 0 - - runBlockingSimulation { - val pacer = Pacer(coroutineContext, clock, quantum = 100) { - count++ - } - - pacer.enqueue() - pacer.cancel() - - assertFalse(pacer.isPending) - } - - assertEquals(0, count) { "Process should never execute " } - } - - @Test - fun testCancelWithoutPending() { - var count = 0 - - runBlockingSimulation { - val pacer = Pacer(coroutineContext, clock, quantum = 100) { - count++ - } - - assertFalse(pacer.isPending) - assertDoesNotThrow { pacer.cancel() } - - pacer.enqueue() - } - - assertEquals(1, count) { "Process should execute once" } - } - - @Test - fun testSubsequent() { - var count = 0 - - runBlockingSimulation { - val pacer = Pacer(coroutineContext, clock, quantum = 100) { - count++ - } - - pacer.enqueue() - delay(100) - pacer.enqueue() - } - - assertEquals(2, count) { "Process should execute twice" } - } -} diff --git a/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt b/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt deleted file mode 100644 index 101a6546..00000000 --- a/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.utils - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.core.runBlockingSimulation - -/** - * A test suite for the [TimerScheduler] class. - */ -@OptIn(ExperimentalCoroutinesApi::class) -internal class TimerSchedulerTest { - @Test - fun testBasicTimer() { - runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) - - scheduler.startSingleTimer(0, 1000) { - scheduler.close() - assertEquals(1000, clock.millis()) - } - } - } - - @Test - fun testCancelNonExisting() { - runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) - - scheduler.cancel(1) - scheduler.close() - } - } - - @Test - fun testCancelExisting() { - runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) - - scheduler.startSingleTimer(0, 1000) { - assertFalse(false) - } - - scheduler.startSingleTimer(1, 100) { - scheduler.cancel(0) - scheduler.close() - - assertEquals(100, clock.millis()) - } - } - } - - @Test - fun testCancelAll() { - runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) - - scheduler.startSingleTimer(0, 1000) { - assertFalse(false) - } - - scheduler.startSingleTimer(1, 100) { - assertFalse(false) - } - - scheduler.close() - } - } - - @Test - fun testOverride() { - runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) - - scheduler.startSingleTimer(0, 1000) { - assertFalse(false) - } - - scheduler.startSingleTimer(0, 200) { - scheduler.close() - - assertEquals(200, clock.millis()) - } - } - } - - @Test - fun testStopped() { - runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) - - scheduler.close() - - assertThrows { - scheduler.startSingleTimer(1, 100) { - assertFalse(false) - } - } - } - } - - @Test - fun testNegativeDelay() { - runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) - - assertThrows { - scheduler.startSingleTimer(1, -1) { - assertFalse(false) - } - } - - scheduler.close() - } - } -} diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 4d8b7d7f..8f082e2f 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -34,7 +34,7 @@ dependencies { api(projects.opendcWorkflow.opendcWorkflowApi) api(projects.opendcCompute.opendcComputeApi) api(projects.opendcTelemetry.opendcTelemetryApi) - implementation(projects.opendcUtils) + implementation(projects.opendcCommon) implementation(libs.kotlin.logging) testImplementation(projects.opendcWorkflow.opendcWorkflowWorkload) diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index cb4bfd45..cdaec021 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -25,8 +25,8 @@ package org.opendc.workflow.service.internal import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* +import org.opendc.common.util.Pacer import org.opendc.compute.api.* -import org.opendc.utils.Pacer import org.opendc.workflow.api.Job import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.service.* diff --git a/settings.gradle.kts b/settings.gradle.kts index 170a687d..f388a385 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -22,6 +22,7 @@ rootProject.name = "opendc" include(":opendc-platform") +include(":opendc-common") include(":opendc-compute:opendc-compute-api") include(":opendc-compute:opendc-compute-service") include(":opendc-compute:opendc-compute-simulator") @@ -60,7 +61,6 @@ include(":opendc-harness:opendc-harness-api") include(":opendc-harness:opendc-harness-engine") include(":opendc-harness:opendc-harness-cli") include(":opendc-harness:opendc-harness-junit5") -include(":opendc-utils") enableFeaturePreview("VERSION_CATALOGS") enableFeaturePreview("TYPESAFE_PROJECT_ACCESSORS") -- cgit v1.2.3 From 0711d4c1d6afb41ca31afbf8bd6253921d57eeb4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 18 Feb 2022 14:45:05 +0100 Subject: perf(common): Optimize TimerScheduler This change updates the TimerScheduler implementation to directly use the Delay object instead of running the timers inside a coroutine. Constructing the coroutine is more expensive, so we prefer running in a Runnable. --- .../org/opendc/common/util/TimerScheduler.kt | 239 ++++++++++----------- .../org/opendc/common/util/TimerSchedulerTest.kt | 46 ++-- .../experiments/tf20/network/NetworkController.kt | 4 +- 3 files changed, 136 insertions(+), 153 deletions(-) diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt index 86314411..bec2c9f1 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt @@ -23,12 +23,10 @@ package org.opendc.common.util import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.selects.select import java.time.Clock import java.util.* +import kotlin.coroutines.ContinuationInterceptor import kotlin.coroutines.CoroutineContext -import kotlin.math.max /** * A TimerScheduler facilitates scheduled execution of future tasks. @@ -37,86 +35,83 @@ import kotlin.math.max * @param clock The clock to keep track of the time. */ @OptIn(ExperimentalCoroutinesApi::class) -public class TimerScheduler(context: CoroutineContext, private val clock: Clock) : AutoCloseable { +public class TimerScheduler(private val context: CoroutineContext, private val clock: Clock) { /** - * The scope in which the scheduler runs. + * The [Delay] instance that provides scheduled execution of [Runnable]s. */ - private val scope = CoroutineScope(context + Job()) + @OptIn(InternalCoroutinesApi::class) + private val delay = + requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } + + /** + * The stack of the invocations to occur in the future. + */ + private val invocations = ArrayDeque() /** * A priority queue containing the tasks to be scheduled in the future. */ - private val queue = PriorityQueue() + private val queue = PriorityQueue>() /** * A map that keeps track of the timers. */ - private val timers = mutableMapOf() + private val timers = mutableMapOf>() /** - * The channel to communicate with the scheduling job. + * Start a timer that will invoke the specified [block] after [delay]. + * + * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * + * @param key The key of the timer to start. + * @param delay The delay before invoking the block. + * @param block The block to invoke. */ - private val channel = Channel(Channel.CONFLATED) + public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) { + startSingleTimerTo(key, clock.millis() + delay, block) + } /** - * A flag to indicate that the scheduler is active. + * Start a timer that will invoke the specified [block] at [timestamp]. + * + * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * + * @param key The key of the timer to start. + * @param timestamp The timestamp at which to invoke the block. + * @param block The block to invoke. */ - private var isActive = true - - init { - scope.launch { - val timers = timers - val queue = queue - val clock = clock - val job = requireNotNull(coroutineContext[Job]) - val exceptionHandler = coroutineContext[CoroutineExceptionHandler] - var next: Long? = channel.receive() - - while (true) { - next = select { - channel.onReceive { it } - - val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select - - onTimeout(delay) { - while (queue.isNotEmpty() && job.isActive) { - val timer = queue.peek() - val timestamp = clock.millis() - - assert(timer.timestamp >= timestamp) { "Found task in the past" } - - if (timer.timestamp > timestamp && !timer.isCancelled) { - // Schedule a task for the next event to occur. - return@onTimeout timer.timestamp - } - - queue.poll() - - if (!timer.isCancelled) { - timers.remove(timer.key) - try { - timer() - } catch (e: Throwable) { - exceptionHandler?.handleException(coroutineContext, e) - } - } - } - - null - } - } + public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) { + val now = clock.millis() + val queue = queue + val invocations = invocations + + require(timestamp >= now) { "Timestamp must be in the future" } + + timers.compute(key) { _, old -> + if (old != null && old.timestamp == timestamp) { + // Fast-path: timer for the same timestamp already exists + old.block = block + old + } else { + // Slow-path: cancel old timer and replace it with new timer + val timer = Timer(key, timestamp, block) + + old?.isCancelled = true + queue.add(timer) + trySchedule(now, invocations, timestamp) + + timer } } } /** - * Stop the scheduler. + * Check if a timer with a given key is active. + * + * @param key The key to check if active. + * @return `true` if the timer with the specified [key] is active, `false` otherwise. */ - override fun close() { - isActive = false - cancelAll() - scope.cancel() - } + public fun isTimerActive(key: T): Boolean = key in timers /** * Cancel a timer with a given key. @@ -127,28 +122,10 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo * @param key The key of the timer to cancel. */ public fun cancel(key: T) { - if (!isActive) { - return - } - val timer = timers.remove(key) // Mark the timer as cancelled timer?.isCancelled = true - - // Optimization: check whether we are the head of the queue - val queue = queue - val channel = channel - val peek = queue.peek() - if (peek == timer) { - queue.poll() - - if (queue.isNotEmpty()) { - channel.trySend(peek.timestamp) - } else { - channel.trySend(null) - } - } } /** @@ -157,64 +134,60 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo public fun cancelAll() { queue.clear() timers.clear() - } - /** - * Check if a timer with a given key is active. - * - * @param key The key to check if active. - * @return `true` if the timer with the specified [key] is active, `false` otherwise. - */ - public fun isTimerActive(key: T): Boolean = key in timers + // Cancel all pending invocations + for (invocation in invocations) { + invocation.cancel() + } + invocations.clear() + } /** - * Start a timer that will invoke the specified [block] after [delay]. - * - * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * Try to schedule an engine invocation at the specified [target]. * - * @param key The key of the timer to start. - * @param delay The delay before invoking the block. - * @param block The block to invoke. + * @param now The current virtual timestamp. + * @param target The virtual timestamp at which the engine invocation should happen. + * @param scheduled The queue of scheduled invocations. */ - public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) { - startSingleTimerTo(key, clock.millis() + delay, block) + private fun trySchedule(now: Long, scheduled: ArrayDeque, target: Long) { + val head = scheduled.peek() + + // Only schedule a new scheduler invocation in case the target is earlier than all other pending + // scheduler invocations + if (head == null || target < head.timestamp) { + @OptIn(InternalCoroutinesApi::class) + val handle = delay.invokeOnTimeout(target - now, ::doRunTimers, context) + scheduled.addFirst(Invocation(target, handle)) + } } /** - * Start a timer that will invoke the specified [block] at [timestamp]. - * - * Each timer has a key and if a new timer with same key is started the previous is cancelled. - * - * @param key The key of the timer to start. - * @param timestamp The timestamp at which to invoke the block. - * @param block The block to invoke. + * This method is invoked when the earliest timer expires. */ - public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) { - val now = clock.millis() - val queue = queue - val channel = channel + private fun doRunTimers() { + val invocations = invocations + val invocation = checkNotNull(invocations.poll()) // Clear invocation from future invocation queue + val now = invocation.timestamp - require(timestamp >= now) { "Timestamp must be in the future" } - check(isActive) { "Timer is stopped" } + while (queue.isNotEmpty()) { + val timer = queue.peek() - timers.compute(key) { _, old -> - if (old != null && old.timestamp == timestamp) { - // Fast-path: timer for the same timestamp already exists - old - } else { - // Slow-path: cancel old timer and replace it with new timer - val timer = Timer(key, timestamp, block) + val timestamp = timer.timestamp + val isCancelled = timer.isCancelled - old?.isCancelled = true - queue.add(timer) + assert(timestamp >= now) { "Found task in the past" } - // Check if we need to push the interruption forward - // Note that we check by timer reference - if (queue.peek() === timer) { - channel.trySend(timer.timestamp) - } + if (timestamp > now && !isCancelled) { + // Schedule a task for the next event to occur. + trySchedule(now, invocations, timestamp) + break + } - timer + queue.poll() + + if (!isCancelled) { + timers.remove(timer.key) + timer() } } } @@ -222,7 +195,7 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo /** * A task that is scheduled to run in the future. */ - private inner class Timer(val key: T, val timestamp: Long, val block: () -> Unit) : Comparable { + private class Timer(val key: T, val timestamp: Long, var block: () -> Unit) : Comparable> { /** * A flag to indicate that the task has been cancelled. */ @@ -234,6 +207,22 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo */ operator fun invoke(): Unit = block() - override fun compareTo(other: Timer): Int = timestamp.compareTo(other.timestamp) + override fun compareTo(other: Timer): Int = timestamp.compareTo(other.timestamp) + } + + /** + * A future engine invocation. + * + * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case + * the invocation is not needed anymore, it can be cancelled via [cancel]. + */ + private class Invocation( + @JvmField val timestamp: Long, + @JvmField val handle: DisposableHandle + ) { + /** + * Cancel the engine invocation. + */ + fun cancel() = handle.dispose() } } diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt index 89b0cbe9..01f61f92 100644 --- a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt +++ b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt @@ -27,21 +27,30 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation +import java.time.Clock +import kotlin.coroutines.EmptyCoroutineContext /** * A test suite for the [TimerScheduler] class. */ @OptIn(ExperimentalCoroutinesApi::class) internal class TimerSchedulerTest { + @Test + fun testEmptyContext() { + assertThrows { TimerScheduler(EmptyCoroutineContext, Clock.systemUTC()) } + } + @Test fun testBasicTimer() { runBlockingSimulation { val scheduler = TimerScheduler(coroutineContext, clock) scheduler.startSingleTimer(0, 1000) { - scheduler.close() assertEquals(1000, clock.millis()) } + + assertTrue(scheduler.isTimerActive(0)) + assertFalse(scheduler.isTimerActive(1)) } } @@ -51,7 +60,6 @@ internal class TimerSchedulerTest { val scheduler = TimerScheduler(coroutineContext, clock) scheduler.cancel(1) - scheduler.close() } } @@ -61,12 +69,11 @@ internal class TimerSchedulerTest { val scheduler = TimerScheduler(coroutineContext, clock) scheduler.startSingleTimer(0, 1000) { - assertFalse(false) + fail() } scheduler.startSingleTimer(1, 100) { scheduler.cancel(0) - scheduler.close() assertEquals(100, clock.millis()) } @@ -78,15 +85,9 @@ internal class TimerSchedulerTest { runBlockingSimulation { val scheduler = TimerScheduler(coroutineContext, clock) - scheduler.startSingleTimer(0, 1000) { - assertFalse(false) - } - - scheduler.startSingleTimer(1, 100) { - assertFalse(false) - } - - scheduler.close() + scheduler.startSingleTimer(0, 1000) { fail() } + scheduler.startSingleTimer(1, 100) { fail() } + scheduler.cancelAll() } } @@ -95,12 +96,9 @@ internal class TimerSchedulerTest { runBlockingSimulation { val scheduler = TimerScheduler(coroutineContext, clock) - scheduler.startSingleTimer(0, 1000) { - assertFalse(false) - } + scheduler.startSingleTimer(0, 1000) { fail() } scheduler.startSingleTimer(0, 200) { - scheduler.close() assertEquals(200, clock.millis()) } @@ -108,16 +106,14 @@ internal class TimerSchedulerTest { } @Test - fun testStopped() { + fun testOverrideBlock() { runBlockingSimulation { val scheduler = TimerScheduler(coroutineContext, clock) - scheduler.close() + scheduler.startSingleTimer(0, 1000) { fail() } - assertThrows { - scheduler.startSingleTimer(1, 100) { - assertFalse(false) - } + scheduler.startSingleTimer(0, 1000) { + assertEquals(1000, clock.millis()) } } } @@ -129,11 +125,9 @@ internal class TimerSchedulerTest { assertThrows { scheduler.startSingleTimer(1, -1) { - assertFalse(false) + fail() } } - - scheduler.close() } } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt index 9771cc20..7d65a674 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt @@ -23,7 +23,7 @@ package org.opendc.experiments.tf20.network import kotlinx.coroutines.channels.Channel -import org.opendc.utils.TimerScheduler +import org.opendc.common.util.TimerScheduler import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -89,6 +89,6 @@ public class NetworkController(context: CoroutineContext, clock: Clock) : AutoCl * Stop the network controller. */ override fun close() { - scheduler.close() + scheduler.cancelAll() } } -- cgit v1.2.3 From 77aaf078650c054ccbaf5f46a71ab218390a571e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 18 Feb 2022 14:53:12 +0100 Subject: build: Remove opendc-platform module This change removes the opendc-platform module from the project. This module represented a Java platform which was previously used for sharing a set of dependency versions between subprojects. However, with the version catalogue that was added by Gradle, we currently do not use the platform anymore. --- opendc-common/build.gradle.kts | 1 - opendc-compute/opendc-compute-api/build.gradle.kts | 4 --- .../opendc-compute-service/build.gradle.kts | 1 - .../opendc-compute-simulator/build.gradle.kts | 1 - .../opendc-compute-workload/build.gradle.kts | 1 - .../opendc-experiments-capelin/build.gradle.kts | 1 - .../build.gradle.kts | 1 - .../opendc-experiments-tf20/build.gradle.kts | 1 - opendc-faas/opendc-faas-api/build.gradle.kts | 4 --- opendc-faas/opendc-faas-service/build.gradle.kts | 1 - opendc-faas/opendc-faas-simulator/build.gradle.kts | 1 - opendc-harness/opendc-harness-api/build.gradle.kts | 1 - opendc-harness/opendc-harness-cli/build.gradle.kts | 1 - .../opendc-harness-engine/build.gradle.kts | 1 - .../opendc-harness-junit5/build.gradle.kts | 1 - opendc-platform/build.gradle.kts | 39 ---------------------- .../opendc-simulator-compute/build.gradle.kts | 1 - .../opendc-simulator-core/build.gradle.kts | 1 - .../opendc-simulator-flow/build.gradle.kts | 1 - .../opendc-simulator-network/build.gradle.kts | 1 - .../opendc-simulator-power/build.gradle.kts | 1 - .../opendc-telemetry-api/build.gradle.kts | 1 - .../opendc-telemetry-compute/build.gradle.kts | 1 - .../opendc-telemetry-sdk/build.gradle.kts | 1 - opendc-trace/opendc-trace-api/build.gradle.kts | 4 --- opendc-trace/opendc-trace-azure/build.gradle.kts | 1 - .../opendc-trace-bitbrains/build.gradle.kts | 1 - opendc-trace/opendc-trace-gwf/build.gradle.kts | 1 - opendc-trace/opendc-trace-opendc/build.gradle.kts | 1 - opendc-trace/opendc-trace-parquet/build.gradle.kts | 2 -- opendc-trace/opendc-trace-swf/build.gradle.kts | 1 - opendc-trace/opendc-trace-tools/build.gradle.kts | 2 -- .../opendc-trace-wfformat/build.gradle.kts | 1 - opendc-trace/opendc-trace-wtf/build.gradle.kts | 1 - opendc-web/opendc-web-runner/build.gradle.kts | 1 - .../opendc-workflow-api/build.gradle.kts | 1 - .../opendc-workflow-service/build.gradle.kts | 1 - .../opendc-workflow-workload/build.gradle.kts | 1 - settings.gradle.kts | 1 - 39 files changed, 88 deletions(-) delete mode 100644 opendc-platform/build.gradle.kts diff --git a/opendc-common/build.gradle.kts b/opendc-common/build.gradle.kts index 67441e93..fda4ffd5 100644 --- a/opendc-common/build.gradle.kts +++ b/opendc-common/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(libs.kotlinx.coroutines) testImplementation(projects.opendcSimulator.opendcSimulatorCore) diff --git a/opendc-compute/opendc-compute-api/build.gradle.kts b/opendc-compute/opendc-compute-api/build.gradle.kts index 880ee03d..2ac7e64c 100644 --- a/opendc-compute/opendc-compute-api/build.gradle.kts +++ b/opendc-compute/opendc-compute-api/build.gradle.kts @@ -26,7 +26,3 @@ description = "API interface for the OpenDC Compute service" plugins { `kotlin-library-conventions` } - -dependencies { - api(platform(projects.opendcPlatform)) -} diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts index b609b147..b9437a73 100644 --- a/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/opendc-compute/opendc-compute-service/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcCompute.opendcComputeApi) api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcCommon) diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index 686d9ca1..9a8cbfcc 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcCompute.opendcComputeService) api(projects.opendcSimulator.opendcSimulatorCompute) api(libs.commons.math3) diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index 28a5e1da..93e09b99 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -29,7 +29,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcTrace.opendcTraceApi) diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index c20556b5..2def3dc5 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) api(projects.opendcCompute.opendcComputeWorkload) diff --git a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts index 65c31c4f..b96647a6 100644 --- a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts @@ -29,7 +29,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcFaas.opendcFaasService) diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts index 75381e1f..43093abf 100644 --- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts @@ -29,7 +29,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) diff --git a/opendc-faas/opendc-faas-api/build.gradle.kts b/opendc-faas/opendc-faas-api/build.gradle.kts index 7362d949..8a295acd 100644 --- a/opendc-faas/opendc-faas-api/build.gradle.kts +++ b/opendc-faas/opendc-faas-api/build.gradle.kts @@ -26,7 +26,3 @@ description = "API for the OpenDC FaaS platform" plugins { `kotlin-library-conventions` } - -dependencies { - api(platform(projects.opendcPlatform)) -} diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts index 95b171b9..7a561014 100644 --- a/opendc-faas/opendc-faas-service/build.gradle.kts +++ b/opendc-faas/opendc-faas-service/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcFaas.opendcFaasApi) api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcCommon) diff --git a/opendc-faas/opendc-faas-simulator/build.gradle.kts b/opendc-faas/opendc-faas-simulator/build.gradle.kts index fed1862d..50f75429 100644 --- a/opendc-faas/opendc-faas-simulator/build.gradle.kts +++ b/opendc-faas/opendc-faas-simulator/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcFaas.opendcFaasService) api(projects.opendcSimulator.opendcSimulatorCompute) diff --git a/opendc-harness/opendc-harness-api/build.gradle.kts b/opendc-harness/opendc-harness-api/build.gradle.kts index 5c464377..c37f14c2 100644 --- a/opendc-harness/opendc-harness-api/build.gradle.kts +++ b/opendc-harness/opendc-harness-api/build.gradle.kts @@ -28,7 +28,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(libs.junit.platform.commons) implementation(libs.kotlin.logging) diff --git a/opendc-harness/opendc-harness-cli/build.gradle.kts b/opendc-harness/opendc-harness-cli/build.gradle.kts index 5d0c0460..4d0f469f 100644 --- a/opendc-harness/opendc-harness-cli/build.gradle.kts +++ b/opendc-harness/opendc-harness-cli/build.gradle.kts @@ -34,7 +34,6 @@ application { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessEngine) implementation(libs.kotlin.logging) diff --git a/opendc-harness/opendc-harness-engine/build.gradle.kts b/opendc-harness/opendc-harness-engine/build.gradle.kts index cbd10f6a..6bf08b7b 100644 --- a/opendc-harness/opendc-harness-engine/build.gradle.kts +++ b/opendc-harness/opendc-harness-engine/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) api(libs.kotlinx.coroutines) diff --git a/opendc-harness/opendc-harness-junit5/build.gradle.kts b/opendc-harness/opendc-harness-junit5/build.gradle.kts index e63367d3..f78a4c30 100644 --- a/opendc-harness/opendc-harness-junit5/build.gradle.kts +++ b/opendc-harness/opendc-harness-junit5/build.gradle.kts @@ -28,7 +28,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessEngine) implementation(libs.kotlin.logging) diff --git a/opendc-platform/build.gradle.kts b/opendc-platform/build.gradle.kts deleted file mode 100644 index c2cb84a8..00000000 --- a/opendc-platform/build.gradle.kts +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -plugins { - `java-platform` - `publishing-conventions` -} - -description = "Java platform for the OpenDC project" - -publishing.publications.named("maven") { - from(components["javaPlatform"]) - - pom { - description.set( - "This Bill of Materials POM can be used to ease dependency management " + - "when referencing multiple OpenDC artifacts using Gradle or Maven." - ) - } -} diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts index ca8b912a..fb516ccf 100644 --- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcSimulator.opendcSimulatorFlow) api(projects.opendcSimulator.opendcSimulatorPower) api(projects.opendcSimulator.opendcSimulatorNetwork) diff --git a/opendc-simulator/opendc-simulator-core/build.gradle.kts b/opendc-simulator/opendc-simulator-core/build.gradle.kts index 6dbf4199..0de96a8e 100644 --- a/opendc-simulator/opendc-simulator-core/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-core/build.gradle.kts @@ -28,6 +28,5 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(libs.kotlinx.coroutines) } diff --git a/opendc-simulator/opendc-simulator-flow/build.gradle.kts b/opendc-simulator/opendc-simulator-flow/build.gradle.kts index f5b67851..3f08071a 100644 --- a/opendc-simulator/opendc-simulator-flow/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-flow/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(libs.kotlinx.coroutines) implementation(libs.kotlin.logging) diff --git a/opendc-simulator/opendc-simulator-network/build.gradle.kts b/opendc-simulator/opendc-simulator-network/build.gradle.kts index f8931053..0cc7763e 100644 --- a/opendc-simulator/opendc-simulator-network/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-network/build.gradle.kts @@ -29,7 +29,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcSimulator.opendcSimulatorFlow) implementation(projects.opendcSimulator.opendcSimulatorCore) diff --git a/opendc-simulator/opendc-simulator-power/build.gradle.kts b/opendc-simulator/opendc-simulator-power/build.gradle.kts index 5d8c8949..59859403 100644 --- a/opendc-simulator/opendc-simulator-power/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-power/build.gradle.kts @@ -29,7 +29,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcSimulator.opendcSimulatorFlow) implementation(projects.opendcSimulator.opendcSimulatorCore) diff --git a/opendc-telemetry/opendc-telemetry-api/build.gradle.kts b/opendc-telemetry/opendc-telemetry-api/build.gradle.kts index 5492fc14..32a36d68 100644 --- a/opendc-telemetry/opendc-telemetry-api/build.gradle.kts +++ b/opendc-telemetry/opendc-telemetry-api/build.gradle.kts @@ -28,6 +28,5 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(libs.opentelemetry.api) } diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts index cd8cb57a..47e30a14 100644 --- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts +++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts @@ -28,7 +28,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTelemetry.opendcTelemetrySdk) implementation(libs.opentelemetry.semconv) diff --git a/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts b/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts index 3b918775..4b3241bc 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts +++ b/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts @@ -28,7 +28,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTelemetry.opendcTelemetryApi) api(libs.kotlinx.coroutines) api(libs.opentelemetry.sdk.main) diff --git a/opendc-trace/opendc-trace-api/build.gradle.kts b/opendc-trace/opendc-trace-api/build.gradle.kts index b2f91593..977eec0d 100644 --- a/opendc-trace/opendc-trace-api/build.gradle.kts +++ b/opendc-trace/opendc-trace-api/build.gradle.kts @@ -26,7 +26,3 @@ description = "Workload trace library for OpenDC" plugins { `kotlin-library-conventions` } - -dependencies { - api(platform(projects.opendcPlatform)) -} diff --git a/opendc-trace/opendc-trace-azure/build.gradle.kts b/opendc-trace/opendc-trace-azure/build.gradle.kts index 8bde56cb..89626737 100644 --- a/opendc-trace/opendc-trace-azure/build.gradle.kts +++ b/opendc-trace/opendc-trace-azure/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTrace.opendcTraceApi) implementation(libs.jackson.dataformat.csv) } diff --git a/opendc-trace/opendc-trace-bitbrains/build.gradle.kts b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts index d195cbbb..71c0e794 100644 --- a/opendc-trace/opendc-trace-bitbrains/build.gradle.kts +++ b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTrace.opendcTraceApi) implementation(libs.jackson.dataformat.csv) } diff --git a/opendc-trace/opendc-trace-gwf/build.gradle.kts b/opendc-trace/opendc-trace-gwf/build.gradle.kts index f3dfd6ef..d02f7a19 100644 --- a/opendc-trace/opendc-trace-gwf/build.gradle.kts +++ b/opendc-trace/opendc-trace-gwf/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTrace.opendcTraceApi) implementation(libs.jackson.dataformat.csv) diff --git a/opendc-trace/opendc-trace-opendc/build.gradle.kts b/opendc-trace/opendc-trace-opendc/build.gradle.kts index b9c242a1..d1b4735e 100644 --- a/opendc-trace/opendc-trace-opendc/build.gradle.kts +++ b/opendc-trace/opendc-trace-opendc/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTrace.opendcTraceApi) implementation(projects.opendcTrace.opendcTraceParquet) diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts index 75378509..f943b6a6 100644 --- a/opendc-trace/opendc-trace-parquet/build.gradle.kts +++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts @@ -30,8 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) - /* This configuration is necessary for a slim dependency on Apache Parquet */ api(libs.parquet) { exclude(group = "org.apache.hadoop") diff --git a/opendc-trace/opendc-trace-swf/build.gradle.kts b/opendc-trace/opendc-trace-swf/build.gradle.kts index c9eaa78d..25d59b49 100644 --- a/opendc-trace/opendc-trace-swf/build.gradle.kts +++ b/opendc-trace/opendc-trace-swf/build.gradle.kts @@ -30,6 +30,5 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTrace.opendcTraceApi) } diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts index 14a0fc7c..0c1e179e 100644 --- a/opendc-trace/opendc-trace-tools/build.gradle.kts +++ b/opendc-trace/opendc-trace-tools/build.gradle.kts @@ -33,8 +33,6 @@ application { } dependencies { - api(platform(projects.opendcPlatform)) - implementation(projects.opendcTrace.opendcTraceApi) implementation(libs.kotlin.logging) implementation(libs.clikt) diff --git a/opendc-trace/opendc-trace-wfformat/build.gradle.kts b/opendc-trace/opendc-trace-wfformat/build.gradle.kts index 2d336d03..cd78fd61 100644 --- a/opendc-trace/opendc-trace-wfformat/build.gradle.kts +++ b/opendc-trace/opendc-trace-wfformat/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTrace.opendcTraceApi) implementation(libs.jackson.core) diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts index e4f0ab3a..8301e363 100644 --- a/opendc-trace/opendc-trace-wtf/build.gradle.kts +++ b/opendc-trace/opendc-trace-wtf/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcTrace.opendcTraceApi) implementation(projects.opendcTrace.opendcTraceParquet) diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts index 810f512f..a73ca6b3 100644 --- a/opendc-web/opendc-web-runner/build.gradle.kts +++ b/opendc-web/opendc-web-runner/build.gradle.kts @@ -34,7 +34,6 @@ application { } dependencies { - api(platform(projects.opendcPlatform)) implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcCompute.opendcComputeWorkload) implementation(projects.opendcSimulator.opendcSimulatorCore) diff --git a/opendc-workflow/opendc-workflow-api/build.gradle.kts b/opendc-workflow/opendc-workflow-api/build.gradle.kts index 36239d05..03569d8c 100644 --- a/opendc-workflow/opendc-workflow-api/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-api/build.gradle.kts @@ -28,7 +28,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcCompute.opendcComputeApi) implementation(libs.kotlin.logging) } diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 8f082e2f..17df33e3 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -30,7 +30,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcWorkflow.opendcWorkflowApi) api(projects.opendcCompute.opendcComputeApi) api(projects.opendcTelemetry.opendcTelemetryApi) diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts index dfb77a39..a9d497af 100644 --- a/opendc-workflow/opendc-workflow-workload/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-workload/build.gradle.kts @@ -29,7 +29,6 @@ plugins { } dependencies { - api(platform(projects.opendcPlatform)) api(projects.opendcWorkflow.opendcWorkflowService) implementation(projects.opendcSimulator.opendcSimulatorCompute) diff --git a/settings.gradle.kts b/settings.gradle.kts index f388a385..f518a984 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -21,7 +21,6 @@ */ rootProject.name = "opendc" -include(":opendc-platform") include(":opendc-common") include(":opendc-compute:opendc-compute-api") include(":opendc-compute:opendc-compute-service") -- cgit v1.2.3 From c82b1725cc606769084155d6c4fba982cd320c41 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 18 Feb 2022 14:59:54 +0100 Subject: fix(compute): Disallow duplicate UIDs for SimHost This change fixes an issue with the ComputeServiceHelper where it allowed users to register multiple SimHost objects with the same UID. See this issue for more information: https://github.com/atlarge-research/opendc/issues/51 --- .../src/main/kotlin/org/opendc/compute/simulator/SimHost.kt | 6 ++++++ .../main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 95921e8b..43f33f27 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -249,6 +249,12 @@ public class SimHost( machine.cancel() } + override fun hashCode(): Int = uid.hashCode() + + override fun equals(other: Any?): Boolean { + return other is SimHost && uid == other.uid + } + override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" public suspend fun fail() { diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index a1a65da3..4b0b343f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -169,7 +169,7 @@ public class ComputeServiceHelper( optimize = optimize ) - _hosts.add(host) + require(_hosts.add(host)) { "Host with uid ${spec.uid} already exists" } service.addHost(host) return host -- cgit v1.2.3