summaryrefslogtreecommitdiff
path: root/opendc-faas
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-11-13 18:16:19 +0000
committerGitHub <noreply@github.com>2022-11-13 18:16:19 +0000
commit52eed48441693149993db79b63431b99e0973027 (patch)
treeba267db531bc3d81409ddfe9caeb6d3b5a65e8c8 /opendc-faas
parent183cfa96910ebb74c668dea7ef98071966f8fcb9 (diff)
parent33d91ef30ad7bcb73365934fe536461210d1082a (diff)
merge: Increase minimum Java version to 17 (#115)
This pull request increases the minimum version of Java required by OpenDC to 17. This new version of Java introduces several new features compared to our old minimum version (11), which we attempt to apply in this conversion. ## Implementation Notes :hammer_and_pick: * Increase minimum Java version to Java 17 * Use RandomGenerator as randomness source * Add common dispatcher interface * Add compatibility with Kotlin coroutines * Use InstantSource as time source * Re-implement SimulationScheduler as Dispatcher * Replace use of CoroutineContext by Dispatcher ## External Dependencies :four_leaf_clover: * Java 17 ## Breaking API Changes :warning: * The use of `CoroutineContext` and `Clock` as parameters of classes has been replaced by the `Dispatcher` interface. * The use of `Clock` has been replaced by `InstantSource` which does not carry time zone info. * The use of `Random` and `SplittableRandom` as parameter type has been replaced by `RandomGenerator`
Diffstat (limited to 'opendc-faas')
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt11
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt10
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt24
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt8
-rw-r--r--opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt20
-rw-r--r--opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt11
-rw-r--r--opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt4
-rw-r--r--opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt7
8 files changed, 42 insertions, 53 deletions
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
index 7b40d867..96619cdb 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
@@ -22,6 +22,7 @@
package org.opendc.faas.service
+import org.opendc.common.Dispatcher
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.api.FaaSFunction
import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy
@@ -30,9 +31,7 @@ import org.opendc.faas.service.internal.FaaSServiceImpl
import org.opendc.faas.service.router.RoutingPolicy
import org.opendc.faas.service.telemetry.FunctionStats
import org.opendc.faas.service.telemetry.SchedulerStats
-import java.time.Clock
import java.time.Duration
-import kotlin.coroutines.CoroutineContext
/**
* The [FaaSService] hosts the service implementation of the OpenDC FaaS platform.
@@ -62,22 +61,20 @@ public interface FaaSService : AutoCloseable {
/**
* Construct a new [FaaSService] implementation.
*
- * @param context The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
+ * @param dispatcher The [Dispatcher] used for scheduling events.
* @param deployer the [FunctionDeployer] to use for deploying function instances.
* @param routingPolicy The policy to route function invocations.
* @param terminationPolicy The policy for terminating function instances.
* @param quantum The scheduling quantum of the service (100 ms default)
*/
public operator fun invoke(
- context: CoroutineContext,
- clock: Clock,
+ dispatcher: Dispatcher,
deployer: FunctionDeployer,
routingPolicy: RoutingPolicy,
terminationPolicy: FunctionTerminationPolicy,
quantum: Duration = Duration.ofMillis(100)
): FaaSService {
- return FaaSServiceImpl(context, clock, deployer, routingPolicy, terminationPolicy, quantum)
+ return FaaSServiceImpl(dispatcher, deployer, routingPolicy, terminationPolicy, quantum)
}
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
index d579ad0c..a2c371e1 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
@@ -22,12 +22,11 @@
package org.opendc.faas.service.autoscaler
+import org.opendc.common.Dispatcher
import org.opendc.common.util.TimerScheduler
import org.opendc.faas.service.deployer.FunctionInstance
import org.opendc.faas.service.deployer.FunctionInstanceState
-import java.time.Clock
import java.time.Duration
-import kotlin.coroutines.CoroutineContext
/**
* A [FunctionTerminationPolicy] that terminates idle function instances after a fixed keep-alive time.
@@ -35,14 +34,13 @@ import kotlin.coroutines.CoroutineContext
* @param timeout The idle timeout after which the function instance is terminated.
*/
public class FunctionTerminationPolicyFixed(
- context: CoroutineContext,
- clock: Clock,
+ dispatcher: Dispatcher,
public val timeout: Duration
) : FunctionTerminationPolicy {
/**
* The [TimerScheduler] used to schedule the function terminations.
*/
- private val scheduler = TimerScheduler<FunctionInstance>(context, clock)
+ private val scheduler = TimerScheduler<FunctionInstance>(dispatcher)
override fun enqueue(instance: FunctionInstance) {
// Cancel the existing timeout timer
@@ -61,6 +59,6 @@ public class FunctionTerminationPolicyFixed(
* Schedule termination for the specified [instance].
*/
private fun schedule(instance: FunctionInstance) {
- scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() }
+ scheduler.startSingleTimer(instance, timeout.toMillis()) { instance.close() }
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
index 901213af..b1e6b3f5 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
@@ -22,13 +22,11 @@
package org.opendc.faas.service.internal
-import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.suspendCancellableCoroutine
import mu.KotlinLogging
+import org.opendc.common.Dispatcher
import org.opendc.common.util.Pacer
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.api.FaaSFunction
@@ -43,13 +41,12 @@ import org.opendc.faas.service.router.RoutingPolicy
import org.opendc.faas.service.telemetry.FunctionStats
import org.opendc.faas.service.telemetry.SchedulerStats
import java.lang.IllegalStateException
-import java.time.Clock
import java.time.Duration
+import java.time.InstantSource
import java.util.ArrayDeque
import java.util.Random
import java.util.UUID
import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resumeWithException
/**
@@ -60,19 +57,13 @@ import kotlin.coroutines.resumeWithException
* this component queues the events to await the deployment of new instances.
*/
internal class FaaSServiceImpl(
- context: CoroutineContext,
- private val clock: Clock,
+ dispatcher: Dispatcher,
private val deployer: FunctionDeployer,
private val routingPolicy: RoutingPolicy,
private val terminationPolicy: FunctionTerminationPolicy,
quantum: Duration
) : FaaSService, FunctionInstanceListener {
/**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
- */
- private val scope = CoroutineScope(context + Job())
-
- /**
* The logger instance of this server.
*/
private val logger = KotlinLogging.logger {}
@@ -80,7 +71,12 @@ internal class FaaSServiceImpl(
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(scope.coroutineContext, clock, quantum = quantum.toMillis()) { doSchedule() }
+ private val pacer = Pacer(dispatcher, quantum.toMillis()) { doSchedule() }
+
+ /**
+ * The [InstantSource] instance representing the clock.
+ */
+ private val clock = dispatcher.timeSource
/**
* The [Random] instance used to generate unique identifiers for the objects.
@@ -266,8 +262,6 @@ internal class FaaSServiceImpl(
}
override fun close() {
- scope.cancel()
-
// Stop all function instances
for ((_, function) in functions) {
function.close()
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt
index 5bd9d4d3..22bf7266 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt
@@ -24,13 +24,15 @@ package org.opendc.faas.service.router
import org.opendc.faas.service.FunctionObject
import org.opendc.faas.service.deployer.FunctionInstance
-import kotlin.random.Random
+import java.util.SplittableRandom
+import java.util.random.RandomGenerator
/**
* A [RoutingPolicy] that selects a random function instance.
*/
-public class RandomRoutingPolicy(private val random: Random = Random(0)) : RoutingPolicy {
+public class RandomRoutingPolicy(private val random: RandomGenerator = SplittableRandom(0)) : RoutingPolicy {
override fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance {
- return instances.random(random)
+ val idx = random.nextInt(instances.size)
+ return instances.elementAt(idx)
}
}
diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
index 97ffc5a5..9676744b 100644
--- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
@@ -44,7 +44,7 @@ internal class FaaSServiceTest {
@Test
fun testClientState() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = assertDoesNotThrow { service.newClient() }
assertDoesNotThrow { client.close() }
@@ -58,7 +58,7 @@ internal class FaaSServiceTest {
@Test
fun testClientInvokeUnknown() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -67,7 +67,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCreation() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -78,7 +78,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionQuery() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -91,7 +91,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindById() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -104,7 +104,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindByName() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -117,7 +117,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDuplicateName() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -128,7 +128,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDelete() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -142,7 +142,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCannotInvokeDeleted() = runSimulation {
- val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -155,7 +155,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionInvoke() = runSimulation {
val deployer = mockk<FunctionDeployer>()
- val service = FaaSService(coroutineContext, clock, deployer, mockk(), mockk(relaxUnitFun = true))
+ val service = FaaSService(dispatcher, deployer, mockk(), mockk(relaxUnitFun = true))
every { deployer.deploy(any(), any()) } answers {
object : FunctionInstance {
diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
index 307ad5a5..47b4d4fa 100644
--- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
+++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
@@ -31,6 +31,8 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.common.Dispatcher
+import org.opendc.common.asCoroutineDispatcher
import org.opendc.faas.service.FunctionObject
import org.opendc.faas.service.deployer.FunctionDeployer
import org.opendc.faas.service.deployer.FunctionInstance
@@ -44,10 +46,8 @@ import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.flow2.FlowEngine
-import java.time.Clock
import java.util.ArrayDeque
import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
@@ -55,8 +55,7 @@ import kotlin.coroutines.resumeWithException
* A [FunctionDeployer] that uses that simulates the [FunctionInstance]s.
*/
public class SimFunctionDeployer(
- context: CoroutineContext,
- private val clock: Clock,
+ private val dispatcher: Dispatcher,
private val model: MachineModel,
private val delayInjector: DelayInjector,
private val mapper: SimFaaSWorkloadMapper = SimMetaFaaSWorkloadMapper()
@@ -64,7 +63,7 @@ public class SimFunctionDeployer(
/**
* The [CoroutineScope] of this deployer.
*/
- private val scope = CoroutineScope(context + Job())
+ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher() + Job())
override fun deploy(function: FunctionObject, listener: FunctionInstanceListener): Instance {
val instance = Instance(function, listener)
@@ -86,7 +85,7 @@ public class SimFunctionDeployer(
* The machine that will execute the workloads.
*/
public val machine: SimMachine = SimBareMetalMachine.create(
- FlowEngine.create(scope.coroutineContext, clock).newGraph(),
+ FlowEngine.create(dispatcher).newGraph(),
model
)
diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt
index d3b31bb9..de7b4aa5 100644
--- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt
+++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt
@@ -23,13 +23,13 @@
package org.opendc.faas.simulator.delay
import org.opendc.faas.service.deployer.FunctionInstance
-import java.util.Random
+import java.util.random.RandomGenerator
import kotlin.math.abs
/*
* Interface for instance deployment delay estimation.
*/
-public class StochasticDelayInjector(private val model: ColdStartModel, private val random: Random) : DelayInjector {
+public class StochasticDelayInjector(private val model: ColdStartModel, private val random: RandomGenerator) : DelayInjector {
override fun getColdStartDelay(instance: FunctionInstance): Long {
val (mean, sd) = model.coldStartParam(instance.function.memorySize.toInt())
return abs(random.nextGaussian() * sd + mean).toLong()
diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
index 6baee7ea..be133ded 100644
--- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
@@ -73,13 +73,12 @@ internal class SimFaaSServiceTest {
})
val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random)
- val deployer = SimFunctionDeployer(coroutineContext, clock, machineModel, delayInjector) { workload }
+ val deployer = SimFunctionDeployer(dispatcher, machineModel, delayInjector) { workload }
val service = FaaSService(
- coroutineContext,
- clock,
+ dispatcher,
deployer,
RandomRoutingPolicy(),
- FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000))
+ FunctionTerminationPolicyFixed(dispatcher, timeout = Duration.ofMillis(10000))
)
val client = service.newClient()