summaryrefslogtreecommitdiff
path: root/opendc-faas/opendc-faas-service
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-11-09 21:59:07 +0000
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-11-13 17:42:01 +0000
commitfb2672afb2d8236d5291cd028196c99d8e4d47f1 (patch)
tree508bbec117239b3d8490cd1bde8d12b6a8ab2155 /opendc-faas/opendc-faas-service
parent00ac59e8e9d6a41c2eac55aa25420dce8fa9c6e0 (diff)
refactor: Replace use of CoroutineContext by Dispatcher
This change replaces the use of `CoroutineContext` for passing the `SimulationDispatcher` across the different modules of OpenDC by the lightweight `Dispatcher` interface of the OpenDC common module.
Diffstat (limited to 'opendc-faas/opendc-faas-service')
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt11
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt10
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt22
-rw-r--r--opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt20
4 files changed, 26 insertions, 37 deletions
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
index 53706c57..96619cdb 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
@@ -22,6 +22,7 @@
package org.opendc.faas.service
+import org.opendc.common.Dispatcher
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.api.FaaSFunction
import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy
@@ -31,8 +32,6 @@ import org.opendc.faas.service.router.RoutingPolicy
import org.opendc.faas.service.telemetry.FunctionStats
import org.opendc.faas.service.telemetry.SchedulerStats
import java.time.Duration
-import java.time.InstantSource
-import kotlin.coroutines.CoroutineContext
/**
* The [FaaSService] hosts the service implementation of the OpenDC FaaS platform.
@@ -62,22 +61,20 @@ public interface FaaSService : AutoCloseable {
/**
* Construct a new [FaaSService] implementation.
*
- * @param context The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
+ * @param dispatcher The [Dispatcher] used for scheduling events.
* @param deployer the [FunctionDeployer] to use for deploying function instances.
* @param routingPolicy The policy to route function invocations.
* @param terminationPolicy The policy for terminating function instances.
* @param quantum The scheduling quantum of the service (100 ms default)
*/
public operator fun invoke(
- context: CoroutineContext,
- clock: InstantSource,
+ dispatcher: Dispatcher,
deployer: FunctionDeployer,
routingPolicy: RoutingPolicy,
terminationPolicy: FunctionTerminationPolicy,
quantum: Duration = Duration.ofMillis(100)
): FaaSService {
- return FaaSServiceImpl(context, clock, deployer, routingPolicy, terminationPolicy, quantum)
+ return FaaSServiceImpl(dispatcher, deployer, routingPolicy, terminationPolicy, quantum)
}
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
index f494adb1..a2c371e1 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
@@ -22,12 +22,11 @@
package org.opendc.faas.service.autoscaler
+import org.opendc.common.Dispatcher
import org.opendc.common.util.TimerScheduler
import org.opendc.faas.service.deployer.FunctionInstance
import org.opendc.faas.service.deployer.FunctionInstanceState
import java.time.Duration
-import java.time.InstantSource
-import kotlin.coroutines.CoroutineContext
/**
* A [FunctionTerminationPolicy] that terminates idle function instances after a fixed keep-alive time.
@@ -35,14 +34,13 @@ import kotlin.coroutines.CoroutineContext
* @param timeout The idle timeout after which the function instance is terminated.
*/
public class FunctionTerminationPolicyFixed(
- context: CoroutineContext,
- clock: InstantSource,
+ dispatcher: Dispatcher,
public val timeout: Duration
) : FunctionTerminationPolicy {
/**
* The [TimerScheduler] used to schedule the function terminations.
*/
- private val scheduler = TimerScheduler<FunctionInstance>(context, clock)
+ private val scheduler = TimerScheduler<FunctionInstance>(dispatcher)
override fun enqueue(instance: FunctionInstance) {
// Cancel the existing timeout timer
@@ -61,6 +59,6 @@ public class FunctionTerminationPolicyFixed(
* Schedule termination for the specified [instance].
*/
private fun schedule(instance: FunctionInstance) {
- scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() }
+ scheduler.startSingleTimer(instance, timeout.toMillis()) { instance.close() }
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
index 3235ff1a..b1e6b3f5 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
@@ -22,13 +22,11 @@
package org.opendc.faas.service.internal
-import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.suspendCancellableCoroutine
import mu.KotlinLogging
+import org.opendc.common.Dispatcher
import org.opendc.common.util.Pacer
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.api.FaaSFunction
@@ -49,7 +47,6 @@ import java.util.ArrayDeque
import java.util.Random
import java.util.UUID
import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resumeWithException
/**
@@ -60,19 +57,13 @@ import kotlin.coroutines.resumeWithException
* this component queues the events to await the deployment of new instances.
*/
internal class FaaSServiceImpl(
- context: CoroutineContext,
- private val clock: InstantSource,
+ dispatcher: Dispatcher,
private val deployer: FunctionDeployer,
private val routingPolicy: RoutingPolicy,
private val terminationPolicy: FunctionTerminationPolicy,
quantum: Duration
) : FaaSService, FunctionInstanceListener {
/**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
- */
- private val scope = CoroutineScope(context + Job())
-
- /**
* The logger instance of this server.
*/
private val logger = KotlinLogging.logger {}
@@ -80,7 +71,12 @@ internal class FaaSServiceImpl(
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(scope.coroutineContext, clock, quantum = quantum.toMillis()) { doSchedule() }
+ private val pacer = Pacer(dispatcher, quantum.toMillis()) { doSchedule() }
+
+ /**
+ * The [InstantSource] instance representing the clock.
+ */
+ private val clock = dispatcher.timeSource
/**
* The [Random] instance used to generate unique identifiers for the objects.
@@ -266,8 +262,6 @@ internal class FaaSServiceImpl(
}
override fun close() {
- scope.cancel()
-
// Stop all function instances
for ((_, function) in functions) {
function.close()
diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
index e29864da..9676744b 100644
--- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
@@ -44,7 +44,7 @@ internal class FaaSServiceTest {
@Test
fun testClientState() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = assertDoesNotThrow { service.newClient() }
assertDoesNotThrow { client.close() }
@@ -58,7 +58,7 @@ internal class FaaSServiceTest {
@Test
fun testClientInvokeUnknown() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -67,7 +67,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCreation() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -78,7 +78,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionQuery() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -91,7 +91,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindById() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -104,7 +104,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindByName() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -117,7 +117,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDuplicateName() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -128,7 +128,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDelete() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -142,7 +142,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCannotInvokeDeleted() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -155,7 +155,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionInvoke() = runSimulation {
val deployer = mockk<FunctionDeployer>()
- val service = FaaSService(coroutineContext, timeSource, deployer, mockk(), mockk(relaxUnitFun = true))
+ val service = FaaSService(dispatcher, deployer, mockk(), mockk(relaxUnitFun = true))
every { deployer.deploy(any(), any()) } answers {
object : FunctionInstance {