diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-11-09 21:59:07 +0000 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-11-13 17:42:01 +0000 |
| commit | fb2672afb2d8236d5291cd028196c99d8e4d47f1 (patch) | |
| tree | 508bbec117239b3d8490cd1bde8d12b6a8ab2155 /opendc-faas/opendc-faas-service | |
| parent | 00ac59e8e9d6a41c2eac55aa25420dce8fa9c6e0 (diff) | |
refactor: Replace use of CoroutineContext by Dispatcher
This change replaces the use of `CoroutineContext` for passing the
`SimulationDispatcher` across the different modules of OpenDC by the
lightweight `Dispatcher` interface of the OpenDC common module.
Diffstat (limited to 'opendc-faas/opendc-faas-service')
4 files changed, 26 insertions, 37 deletions
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index 53706c57..96619cdb 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -22,6 +22,7 @@ package org.opendc.faas.service +import org.opendc.common.Dispatcher import org.opendc.faas.api.FaaSClient import org.opendc.faas.api.FaaSFunction import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy @@ -31,8 +32,6 @@ import org.opendc.faas.service.router.RoutingPolicy import org.opendc.faas.service.telemetry.FunctionStats import org.opendc.faas.service.telemetry.SchedulerStats import java.time.Duration -import java.time.InstantSource -import kotlin.coroutines.CoroutineContext /** * The [FaaSService] hosts the service implementation of the OpenDC FaaS platform. @@ -62,22 +61,20 @@ public interface FaaSService : AutoCloseable { /** * Construct a new [FaaSService] implementation. * - * @param context The [CoroutineContext] to use in the service. - * @param clock The clock instance to use. + * @param dispatcher The [Dispatcher] used for scheduling events. * @param deployer the [FunctionDeployer] to use for deploying function instances. * @param routingPolicy The policy to route function invocations. * @param terminationPolicy The policy for terminating function instances. * @param quantum The scheduling quantum of the service (100 ms default) */ public operator fun invoke( - context: CoroutineContext, - clock: InstantSource, + dispatcher: Dispatcher, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, terminationPolicy: FunctionTerminationPolicy, quantum: Duration = Duration.ofMillis(100) ): FaaSService { - return FaaSServiceImpl(context, clock, deployer, routingPolicy, terminationPolicy, quantum) + return FaaSServiceImpl(dispatcher, deployer, routingPolicy, terminationPolicy, quantum) } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt index f494adb1..a2c371e1 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt @@ -22,12 +22,11 @@ package org.opendc.faas.service.autoscaler +import org.opendc.common.Dispatcher import org.opendc.common.util.TimerScheduler import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceState import java.time.Duration -import java.time.InstantSource -import kotlin.coroutines.CoroutineContext /** * A [FunctionTerminationPolicy] that terminates idle function instances after a fixed keep-alive time. @@ -35,14 +34,13 @@ import kotlin.coroutines.CoroutineContext * @param timeout The idle timeout after which the function instance is terminated. */ public class FunctionTerminationPolicyFixed( - context: CoroutineContext, - clock: InstantSource, + dispatcher: Dispatcher, public val timeout: Duration ) : FunctionTerminationPolicy { /** * The [TimerScheduler] used to schedule the function terminations. */ - private val scheduler = TimerScheduler<FunctionInstance>(context, clock) + private val scheduler = TimerScheduler<FunctionInstance>(dispatcher) override fun enqueue(instance: FunctionInstance) { // Cancel the existing timeout timer @@ -61,6 +59,6 @@ public class FunctionTerminationPolicyFixed( * Schedule termination for the specified [instance]. */ private fun schedule(instance: FunctionInstance) { - scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() } + scheduler.startSingleTimer(instance, timeout.toMillis()) { instance.close() } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index 3235ff1a..b1e6b3f5 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -22,13 +22,11 @@ package org.opendc.faas.service.internal -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel import kotlinx.coroutines.intrinsics.startCoroutineCancellable import kotlinx.coroutines.suspendCancellableCoroutine import mu.KotlinLogging +import org.opendc.common.Dispatcher import org.opendc.common.util.Pacer import org.opendc.faas.api.FaaSClient import org.opendc.faas.api.FaaSFunction @@ -49,7 +47,6 @@ import java.util.ArrayDeque import java.util.Random import java.util.UUID import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resumeWithException /** @@ -60,19 +57,13 @@ import kotlin.coroutines.resumeWithException * this component queues the events to await the deployment of new instances. */ internal class FaaSServiceImpl( - context: CoroutineContext, - private val clock: InstantSource, + dispatcher: Dispatcher, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, private val terminationPolicy: FunctionTerminationPolicy, quantum: Duration ) : FaaSService, FunctionInstanceListener { /** - * The [CoroutineScope] of the service bounded by the lifecycle of the service. - */ - private val scope = CoroutineScope(context + Job()) - - /** * The logger instance of this server. */ private val logger = KotlinLogging.logger {} @@ -80,7 +71,12 @@ internal class FaaSServiceImpl( /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, quantum = quantum.toMillis()) { doSchedule() } + private val pacer = Pacer(dispatcher, quantum.toMillis()) { doSchedule() } + + /** + * The [InstantSource] instance representing the clock. + */ + private val clock = dispatcher.timeSource /** * The [Random] instance used to generate unique identifiers for the objects. @@ -266,8 +262,6 @@ internal class FaaSServiceImpl( } override fun close() { - scope.cancel() - // Stop all function instances for ((_, function) in functions) { function.close() diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt index e29864da..9676744b 100644 --- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt @@ -44,7 +44,7 @@ internal class FaaSServiceTest { @Test fun testClientState() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } @@ -58,7 +58,7 @@ internal class FaaSServiceTest { @Test fun testClientInvokeUnknown() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -67,7 +67,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCreation() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -78,7 +78,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionQuery() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -91,7 +91,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindById() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -104,7 +104,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindByName() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -117,7 +117,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDuplicateName() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -128,7 +128,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDelete() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -142,7 +142,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -155,7 +155,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionInvoke() = runSimulation { val deployer = mockk<FunctionDeployer>() - val service = FaaSService(coroutineContext, timeSource, deployer, mockk(), mockk(relaxUnitFun = true)) + val service = FaaSService(dispatcher, deployer, mockk(), mockk(relaxUnitFun = true)) every { deployer.deploy(any(), any()) } answers { object : FunctionInstance { |
