diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-11-13 18:16:19 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-11-13 18:16:19 +0000 |
| commit | 52eed48441693149993db79b63431b99e0973027 (patch) | |
| tree | ba267db531bc3d81409ddfe9caeb6d3b5a65e8c8 /opendc-faas | |
| parent | 183cfa96910ebb74c668dea7ef98071966f8fcb9 (diff) | |
| parent | 33d91ef30ad7bcb73365934fe536461210d1082a (diff) | |
merge: Increase minimum Java version to 17 (#115)
This pull request increases the minimum version of Java required by OpenDC to 17.
This new version of Java introduces several new features compared to our old minimum
version (11), which we attempt to apply in this conversion.
## Implementation Notes :hammer_and_pick:
* Increase minimum Java version to Java 17
* Use RandomGenerator as randomness source
* Add common dispatcher interface
* Add compatibility with Kotlin coroutines
* Use InstantSource as time source
* Re-implement SimulationScheduler as Dispatcher
* Replace use of CoroutineContext by Dispatcher
## External Dependencies :four_leaf_clover:
* Java 17
## Breaking API Changes :warning:
* The use of `CoroutineContext` and `Clock` as parameters of classes has been replaced
by the `Dispatcher` interface.
* The use of `Clock` has been replaced by `InstantSource` which does not carry time
zone info.
* The use of `Random` and `SplittableRandom` as parameter type has been replaced
by `RandomGenerator`
Diffstat (limited to 'opendc-faas')
8 files changed, 42 insertions, 53 deletions
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index 7b40d867..96619cdb 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -22,6 +22,7 @@ package org.opendc.faas.service +import org.opendc.common.Dispatcher import org.opendc.faas.api.FaaSClient import org.opendc.faas.api.FaaSFunction import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy @@ -30,9 +31,7 @@ import org.opendc.faas.service.internal.FaaSServiceImpl import org.opendc.faas.service.router.RoutingPolicy import org.opendc.faas.service.telemetry.FunctionStats import org.opendc.faas.service.telemetry.SchedulerStats -import java.time.Clock import java.time.Duration -import kotlin.coroutines.CoroutineContext /** * The [FaaSService] hosts the service implementation of the OpenDC FaaS platform. @@ -62,22 +61,20 @@ public interface FaaSService : AutoCloseable { /** * Construct a new [FaaSService] implementation. * - * @param context The [CoroutineContext] to use in the service. - * @param clock The clock instance to use. + * @param dispatcher The [Dispatcher] used for scheduling events. * @param deployer the [FunctionDeployer] to use for deploying function instances. * @param routingPolicy The policy to route function invocations. * @param terminationPolicy The policy for terminating function instances. * @param quantum The scheduling quantum of the service (100 ms default) */ public operator fun invoke( - context: CoroutineContext, - clock: Clock, + dispatcher: Dispatcher, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, terminationPolicy: FunctionTerminationPolicy, quantum: Duration = Duration.ofMillis(100) ): FaaSService { - return FaaSServiceImpl(context, clock, deployer, routingPolicy, terminationPolicy, quantum) + return FaaSServiceImpl(dispatcher, deployer, routingPolicy, terminationPolicy, quantum) } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt index d579ad0c..a2c371e1 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt @@ -22,12 +22,11 @@ package org.opendc.faas.service.autoscaler +import org.opendc.common.Dispatcher import org.opendc.common.util.TimerScheduler import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceState -import java.time.Clock import java.time.Duration -import kotlin.coroutines.CoroutineContext /** * A [FunctionTerminationPolicy] that terminates idle function instances after a fixed keep-alive time. @@ -35,14 +34,13 @@ import kotlin.coroutines.CoroutineContext * @param timeout The idle timeout after which the function instance is terminated. */ public class FunctionTerminationPolicyFixed( - context: CoroutineContext, - clock: Clock, + dispatcher: Dispatcher, public val timeout: Duration ) : FunctionTerminationPolicy { /** * The [TimerScheduler] used to schedule the function terminations. */ - private val scheduler = TimerScheduler<FunctionInstance>(context, clock) + private val scheduler = TimerScheduler<FunctionInstance>(dispatcher) override fun enqueue(instance: FunctionInstance) { // Cancel the existing timeout timer @@ -61,6 +59,6 @@ public class FunctionTerminationPolicyFixed( * Schedule termination for the specified [instance]. */ private fun schedule(instance: FunctionInstance) { - scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() } + scheduler.startSingleTimer(instance, timeout.toMillis()) { instance.close() } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index 901213af..b1e6b3f5 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -22,13 +22,11 @@ package org.opendc.faas.service.internal -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel import kotlinx.coroutines.intrinsics.startCoroutineCancellable import kotlinx.coroutines.suspendCancellableCoroutine import mu.KotlinLogging +import org.opendc.common.Dispatcher import org.opendc.common.util.Pacer import org.opendc.faas.api.FaaSClient import org.opendc.faas.api.FaaSFunction @@ -43,13 +41,12 @@ import org.opendc.faas.service.router.RoutingPolicy import org.opendc.faas.service.telemetry.FunctionStats import org.opendc.faas.service.telemetry.SchedulerStats import java.lang.IllegalStateException -import java.time.Clock import java.time.Duration +import java.time.InstantSource import java.util.ArrayDeque import java.util.Random import java.util.UUID import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resumeWithException /** @@ -60,19 +57,13 @@ import kotlin.coroutines.resumeWithException * this component queues the events to await the deployment of new instances. */ internal class FaaSServiceImpl( - context: CoroutineContext, - private val clock: Clock, + dispatcher: Dispatcher, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, private val terminationPolicy: FunctionTerminationPolicy, quantum: Duration ) : FaaSService, FunctionInstanceListener { /** - * The [CoroutineScope] of the service bounded by the lifecycle of the service. - */ - private val scope = CoroutineScope(context + Job()) - - /** * The logger instance of this server. */ private val logger = KotlinLogging.logger {} @@ -80,7 +71,12 @@ internal class FaaSServiceImpl( /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, quantum = quantum.toMillis()) { doSchedule() } + private val pacer = Pacer(dispatcher, quantum.toMillis()) { doSchedule() } + + /** + * The [InstantSource] instance representing the clock. + */ + private val clock = dispatcher.timeSource /** * The [Random] instance used to generate unique identifiers for the objects. @@ -266,8 +262,6 @@ internal class FaaSServiceImpl( } override fun close() { - scope.cancel() - // Stop all function instances for ((_, function) in functions) { function.close() diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt index 5bd9d4d3..22bf7266 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt @@ -24,13 +24,15 @@ package org.opendc.faas.service.router import org.opendc.faas.service.FunctionObject import org.opendc.faas.service.deployer.FunctionInstance -import kotlin.random.Random +import java.util.SplittableRandom +import java.util.random.RandomGenerator /** * A [RoutingPolicy] that selects a random function instance. */ -public class RandomRoutingPolicy(private val random: Random = Random(0)) : RoutingPolicy { +public class RandomRoutingPolicy(private val random: RandomGenerator = SplittableRandom(0)) : RoutingPolicy { override fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance { - return instances.random(random) + val idx = random.nextInt(instances.size) + return instances.elementAt(idx) } } diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt index 97ffc5a5..9676744b 100644 --- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt @@ -44,7 +44,7 @@ internal class FaaSServiceTest { @Test fun testClientState() = runSimulation { - val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } @@ -58,7 +58,7 @@ internal class FaaSServiceTest { @Test fun testClientInvokeUnknown() = runSimulation { - val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -67,7 +67,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCreation() = runSimulation { - val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -78,7 +78,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionQuery() = runSimulation { - val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -91,7 +91,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindById() = runSimulation { - val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -104,7 +104,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindByName() = runSimulation { - val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -117,7 +117,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDuplicateName() = runSimulation { - val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -128,7 +128,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDelete() = runSimulation { - val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -142,7 +142,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runSimulation { - val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -155,7 +155,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionInvoke() = runSimulation { val deployer = mockk<FunctionDeployer>() - val service = FaaSService(coroutineContext, clock, deployer, mockk(), mockk(relaxUnitFun = true)) + val service = FaaSService(dispatcher, deployer, mockk(), mockk(relaxUnitFun = true)) every { deployer.deploy(any(), any()) } answers { object : FunctionInstance { diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt index 307ad5a5..47b4d4fa 100644 --- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt +++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt @@ -31,6 +31,8 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.common.Dispatcher +import org.opendc.common.asCoroutineDispatcher import org.opendc.faas.service.FunctionObject import org.opendc.faas.service.deployer.FunctionDeployer import org.opendc.faas.service.deployer.FunctionInstance @@ -44,10 +46,8 @@ import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.flow2.FlowEngine -import java.time.Clock import java.util.ArrayDeque import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException @@ -55,8 +55,7 @@ import kotlin.coroutines.resumeWithException * A [FunctionDeployer] that uses that simulates the [FunctionInstance]s. */ public class SimFunctionDeployer( - context: CoroutineContext, - private val clock: Clock, + private val dispatcher: Dispatcher, private val model: MachineModel, private val delayInjector: DelayInjector, private val mapper: SimFaaSWorkloadMapper = SimMetaFaaSWorkloadMapper() @@ -64,7 +63,7 @@ public class SimFunctionDeployer( /** * The [CoroutineScope] of this deployer. */ - private val scope = CoroutineScope(context + Job()) + private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher() + Job()) override fun deploy(function: FunctionObject, listener: FunctionInstanceListener): Instance { val instance = Instance(function, listener) @@ -86,7 +85,7 @@ public class SimFunctionDeployer( * The machine that will execute the workloads. */ public val machine: SimMachine = SimBareMetalMachine.create( - FlowEngine.create(scope.coroutineContext, clock).newGraph(), + FlowEngine.create(dispatcher).newGraph(), model ) diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt index d3b31bb9..de7b4aa5 100644 --- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt +++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/StochasticDelayInjector.kt @@ -23,13 +23,13 @@ package org.opendc.faas.simulator.delay import org.opendc.faas.service.deployer.FunctionInstance -import java.util.Random +import java.util.random.RandomGenerator import kotlin.math.abs /* * Interface for instance deployment delay estimation. */ -public class StochasticDelayInjector(private val model: ColdStartModel, private val random: Random) : DelayInjector { +public class StochasticDelayInjector(private val model: ColdStartModel, private val random: RandomGenerator) : DelayInjector { override fun getColdStartDelay(instance: FunctionInstance): Long { val (mean, sd) = model.coldStartParam(instance.function.memorySize.toInt()) return abs(random.nextGaussian() * sd + mean).toLong() diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index 6baee7ea..be133ded 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -73,13 +73,12 @@ internal class SimFaaSServiceTest { }) val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random) - val deployer = SimFunctionDeployer(coroutineContext, clock, machineModel, delayInjector) { workload } + val deployer = SimFunctionDeployer(dispatcher, machineModel, delayInjector) { workload } val service = FaaSService( - coroutineContext, - clock, + dispatcher, deployer, RandomRoutingPolicy(), - FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000)) + FunctionTerminationPolicyFixed(dispatcher, timeout = Duration.ofMillis(10000)) ) val client = service.newClient() |
