From 028960fbf584c903156c713447194df56ec5059e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 18 Feb 2022 12:35:11 +0100 Subject: feat(utils): Add Pacer to pace scheduling cycles This change adds a new Pacer class that can pace the incoming scheduling requests into scheduling cycles by allowing the user to specify a scheduling quantum. --- .../compute/service/internal/ComputeServiceImpl.kt | 26 ++--- .../faas/service/internal/FaaSServiceImpl.kt | 21 ++-- opendc-utils/build.gradle.kts | 1 + .../src/main/kotlin/org/opendc/utils/Pacer.kt | 92 +++++++++++++++ .../src/test/kotlin/org/opendc/utils/PacerTest.kt | 127 +++++++++++++++++++++ .../service/internal/WorkflowServiceImpl.kt | 38 +----- 6 files changed, 241 insertions(+), 64 deletions(-) create mode 100644 opendc-utils/src/main/kotlin/org/opendc/utils/Pacer.kt create mode 100644 opendc-utils/src/test/kotlin/org/opendc/utils/PacerTest.kt diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 27a6ecae..6df3c4f9 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -35,7 +35,7 @@ import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.utils.TimerScheduler +import org.opendc.utils.Pacer import java.time.Clock import java.time.Duration import java.util.* @@ -56,7 +56,7 @@ internal class ComputeServiceImpl( private val clock: Clock, meterProvider: MeterProvider, private val scheduler: ComputeScheduler, - private val schedulingQuantum: Duration + schedulingQuantum: Duration ) : ComputeService, HostListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -147,9 +147,9 @@ internal class ComputeServiceImpl( private val _serversActiveAttr = Attributes.of(AttributeKey.stringKey("state"), "active") /** - * The [TimerScheduler] to use for scheduling the scheduler cycles. + * The [Pacer] to use for scheduling the scheduler cycles. */ - private var timerScheduler: TimerScheduler = TimerScheduler(scope.coroutineContext, clock) + private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis(), ::doSchedule) override val hosts: Set get() = hostToView.keys @@ -354,28 +354,18 @@ internal class ComputeServiceImpl( * Indicate that a new scheduling cycle is needed due to a change to the service's state. */ private fun requestSchedulingCycle() { - // Bail out in case we have already requested a new cycle or the queue is empty. - if (timerScheduler.isTimerActive(Unit) || queue.isEmpty()) { + // Bail out in case the queue is empty. + if (queue.isEmpty()) { return } - val quantum = schedulingQuantum.toMillis() - - // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). - // This is important because the slices of the VMs need to be aligned. - // We calculate here the delay until the next scheduling slot. - val delay = quantum - (clock.millis() % quantum) - - timerScheduler.startSingleTimer(Unit, delay) { - doSchedule() - } + pacer.enqueue() } /** * Run a single scheduling iteration. */ - private fun doSchedule() { - val now = clock.millis() + private fun doSchedule(now: Long) { while (queue.isNotEmpty()) { val request = queue.peek() diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index c285585a..82032718 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -37,7 +37,7 @@ import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceListener import org.opendc.faas.service.deployer.FunctionInstanceState import org.opendc.faas.service.router.RoutingPolicy -import org.opendc.utils.TimerScheduler +import org.opendc.utils.Pacer import java.lang.IllegalStateException import java.time.Clock import java.util.* @@ -55,7 +55,7 @@ import kotlin.coroutines.resumeWithException internal class FaaSServiceImpl( context: CoroutineContext, private val clock: Clock, - private val meterProvider: MeterProvider, + meterProvider: MeterProvider, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, private val terminationPolicy: FunctionTerminationPolicy @@ -76,9 +76,9 @@ internal class FaaSServiceImpl( private val meter = meterProvider.get("org.opendc.faas.service") /** - * The [TimerScheduler] to use for scheduling the scheduler cycles. + * The [Pacer] to use for scheduling the scheduler cycles. */ - private val scheduler: TimerScheduler = TimerScheduler(scope.coroutineContext, clock) + private val pacer = Pacer(scope.coroutineContext, clock, quantum = 100) { doSchedule() } /** * The [Random] instance used to generate unique identifiers for the objects. @@ -191,19 +191,12 @@ internal class FaaSServiceImpl( * Indicate that a new scheduling cycle is needed due to a change to the service's state. */ private fun schedule() { - // Bail out in case we have already requested a new cycle or the queue is empty. - if (scheduler.isTimerActive(Unit) || queue.isEmpty()) { + // Bail out in case the queue is empty. + if (queue.isEmpty()) { return } - val quantum = 100 - - // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). - // This is important because the slices of the VMs need to be aligned. - // We calculate here the delay until the next scheduling slot. - val delay = quantum - (clock.millis() % quantum) - - scheduler.startSingleTimer(Unit, delay, ::doSchedule) + pacer.enqueue() } /** diff --git a/opendc-utils/build.gradle.kts b/opendc-utils/build.gradle.kts index 800b374d..858aa64c 100644 --- a/opendc-utils/build.gradle.kts +++ b/opendc-utils/build.gradle.kts @@ -26,6 +26,7 @@ description = "Utilities used across OpenDC modules" plugins { `kotlin-library-conventions` `testing-conventions` + `jacoco-conventions` } dependencies { diff --git a/opendc-utils/src/main/kotlin/org/opendc/utils/Pacer.kt b/opendc-utils/src/main/kotlin/org/opendc/utils/Pacer.kt new file mode 100644 index 00000000..07d3cb87 --- /dev/null +++ b/opendc-utils/src/main/kotlin/org/opendc/utils/Pacer.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.utils + +import kotlinx.coroutines.* +import java.lang.Runnable +import java.time.Clock +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext + +/** + * Helper class to pace the incoming scheduling requests. + * + * @param context The [CoroutineContext] in which the pacer runs. + * @param clock The virtual simulation clock. + * @param quantum The scheduling quantum. + * @param process The process to invoke for the incoming requests. + */ +public class Pacer( + private val context: CoroutineContext, + private val clock: Clock, + private val quantum: Long, + private val process: (Long) -> Unit +) { + /** + * The [Delay] instance that provides scheduled execution of [Runnable]s. + */ + @OptIn(InternalCoroutinesApi::class) + private val delay = + requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } + + /** + * The current [DisposableHandle] representing the pending scheduling cycle. + */ + private var handle: DisposableHandle? = null + + /** + * Determine whether a scheduling cycle is pending. + */ + public val isPending: Boolean get() = handle != null + + /** + * Enqueue a new scheduling cycle. + */ + public fun enqueue() { + if (handle != null) { + return + } + + val quantum = quantum + val now = clock.millis() + + // We assume that the scheduler runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). + // We calculate here the delay until the next scheduling slot. + val timeUntilNextSlot = quantum - (now % quantum) + + @OptIn(InternalCoroutinesApi::class) + handle = delay.invokeOnTimeout(timeUntilNextSlot, { + process(now + timeUntilNextSlot) + handle = null + }, context) + } + + /** + * Cancel the currently pending scheduling cycle. + */ + public fun cancel() { + val handle = handle ?: return + this.handle = null + handle.dispose() + } +} diff --git a/opendc-utils/src/test/kotlin/org/opendc/utils/PacerTest.kt b/opendc-utils/src/test/kotlin/org/opendc/utils/PacerTest.kt new file mode 100644 index 00000000..b8419e80 --- /dev/null +++ b/opendc-utils/src/test/kotlin/org/opendc/utils/PacerTest.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.utils + +import kotlinx.coroutines.delay +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import java.time.Clock +import kotlin.coroutines.EmptyCoroutineContext + +/** + * Test suite for the [Pacer] class. + */ +class PacerTest { + @Test + fun testEmptyContext() { + assertThrows { Pacer(EmptyCoroutineContext, Clock.systemUTC(), 100) {} } + } + + @Test + fun testSingleEnqueue() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + } + + assertEquals(1, count) { "Process should execute once" } + } + + @Test + fun testCascade() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + pacer.enqueue() + + assertTrue(pacer.isPending) + } + + assertEquals(1, count) { "Process should execute once" } + } + + @Test + fun testCancel() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + pacer.cancel() + + assertFalse(pacer.isPending) + } + + assertEquals(0, count) { "Process should never execute " } + } + + @Test + fun testCancelWithoutPending() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + assertFalse(pacer.isPending) + assertDoesNotThrow { pacer.cancel() } + + pacer.enqueue() + } + + assertEquals(1, count) { "Process should execute once" } + } + + @Test + fun testSubsequent() { + var count = 0 + + runBlockingSimulation { + val pacer = Pacer(coroutineContext, clock, quantum = 100) { + count++ + } + + pacer.enqueue() + delay(100) + pacer.enqueue() + } + + assertEquals(2, count) { "Process should execute twice" } + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 7b6d8651..cb4bfd45 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -26,7 +26,7 @@ import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import org.opendc.compute.api.* -import org.opendc.utils.TimerScheduler +import org.opendc.utils.Pacer import org.opendc.workflow.api.Job import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.service.* @@ -187,9 +187,9 @@ public class WorkflowServiceImpl( .build() /** - * The [TimerScheduler] to use for scheduling the scheduler cycles. + * The [Pacer] to use for scheduling the scheduler cycles. */ - private val timerScheduler: TimerScheduler = TimerScheduler(scope.coroutineContext, clock) + private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic @@ -230,7 +230,7 @@ public class WorkflowServiceImpl( rootListener.jobSubmitted(jobInstance) submittedJobs.add(1) - requestSchedulingCycle() + pacer.enqueue() } override fun close() { @@ -251,31 +251,6 @@ public class WorkflowServiceImpl( rootListener.listeners -= listener } - /** - * Indicate that a new scheduling cycle is needed due to a change to the service's state. - */ - private fun requestSchedulingCycle() { - // Bail out in case we have already requested a new cycle or the queue is empty. - if (timerScheduler.isTimerActive(Unit)) { - return - } - - val quantum = schedulingQuantum.toMillis() - if (quantum == 0L) { - doSchedule() - return - } - - // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). - // This is important because the slices of the VMs need to be aligned. - // We calculate here the delay until the next scheduling slot. - val delay = quantum - (clock.millis() % quantum) - - timerScheduler.startSingleTimer(Unit, delay) { - doSchedule() - } - } - /** * Perform a scheduling cycle immediately. */ @@ -409,10 +384,9 @@ public class WorkflowServiceImpl( finishJob(job) } - requestSchedulingCycle() - } - ServerState.DELETED -> { + pacer.enqueue() } + ServerState.DELETED -> {} else -> throw IllegalStateException() } } -- cgit v1.2.3