summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt4
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt5
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/autoscaler/FunctionTerminationPolicy.kt36
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/autoscaler/FunctionTerminationPolicyFixed.kt65
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/autoscaler/FunctionTerminationPolicyNull.kt34
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt2
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceListener.kt33
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceState.kt5
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt20
-rw-r--r--opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt22
-rw-r--r--opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt16
-rw-r--r--opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt6
12 files changed, 221 insertions, 27 deletions
diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
index 3ff01e39..516bcc3e 100644
--- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
+++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
@@ -34,6 +34,7 @@ import org.opendc.experiments.serverless.trace.ServerlessTraceReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.serverless.service.ServerlessService
+import org.opendc.serverless.service.autoscaler.FunctionTerminationPolicyFixed
import org.opendc.serverless.service.router.RandomRoutingPolicy
import org.opendc.serverless.simulator.SimFunctionDeployer
import org.opendc.serverless.simulator.delay.ColdStartModel
@@ -83,7 +84,8 @@ public class ServerlessExperiment : Experiment("Serverless") {
val traceById = trace.associateBy { it.id }
val delayInjector = StochasticDelayInjector(coldStartModel, Random())
val deployer = SimFunctionDeployer(clock, this, createMachineModel(), delayInjector) { FunctionTraceWorkload(traceById.getValue(it.name)) }
- val service = ServerlessService(coroutineContext, clock, meterProvider.get("opendc-serverless"), deployer, routingPolicy)
+ val service =
+ ServerlessService(coroutineContext, clock, meterProvider.get("opendc-serverless"), deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10 * 60 * 1000))
val client = service.newClient()
coroutineScope {
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt
index a791c815..e2f135ae 100644
--- a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt
@@ -24,6 +24,7 @@ package org.opendc.serverless.service
import io.opentelemetry.api.metrics.Meter
import org.opendc.serverless.api.ServerlessClient
+import org.opendc.serverless.service.autoscaler.FunctionTerminationPolicy
import org.opendc.serverless.service.deployer.FunctionDeployer
import org.opendc.serverless.service.internal.ServerlessServiceImpl
import org.opendc.serverless.service.router.RoutingPolicy
@@ -53,6 +54,7 @@ public interface ServerlessService : AutoCloseable {
* @param meter The meter to report metrics to.
* @param deployer the [FunctionDeployer] to use for deploying function instances.
* @param routingPolicy The policy to route function invocations.
+ * @param terminationPolicy The policy for terminating function instances.
*/
public operator fun invoke(
context: CoroutineContext,
@@ -60,8 +62,9 @@ public interface ServerlessService : AutoCloseable {
meter: Meter,
deployer: FunctionDeployer,
routingPolicy: RoutingPolicy,
+ terminationPolicy: FunctionTerminationPolicy,
): ServerlessService {
- return ServerlessServiceImpl(context, clock, meter, deployer, routingPolicy)
+ return ServerlessServiceImpl(context, clock, meter, deployer, routingPolicy, terminationPolicy)
}
}
}
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/autoscaler/FunctionTerminationPolicy.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/autoscaler/FunctionTerminationPolicy.kt
new file mode 100644
index 00000000..25df10a6
--- /dev/null
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/autoscaler/FunctionTerminationPolicy.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service.autoscaler
+
+import org.opendc.serverless.service.deployer.FunctionInstance
+import org.opendc.serverless.service.deployer.FunctionInstanceListener
+
+/**
+ * A management policy that is responsible for downscaling the active function instances for a function.
+ */
+public interface FunctionTerminationPolicy : FunctionInstanceListener {
+ /**
+ * Enqueue the specified [instance] to be scheduled for termination a
+ */
+ public fun enqueue(instance: FunctionInstance)
+}
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/autoscaler/FunctionTerminationPolicyFixed.kt
new file mode 100644
index 00000000..26b99f52
--- /dev/null
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/autoscaler/FunctionTerminationPolicyFixed.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service.autoscaler
+
+import org.opendc.serverless.service.deployer.FunctionInstance
+import org.opendc.serverless.service.deployer.FunctionInstanceState
+import org.opendc.utils.TimerScheduler
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A [FunctionTerminationPolicy] that terminates idle function instances after a fixed keep-alive time.
+ *
+ * @param timeout The idle timeout after which the function instance is terminated.
+ */
+public class FunctionTerminationPolicyFixed(
+ context: CoroutineContext,
+ clock: Clock,
+ public val timeout: Long
+) : FunctionTerminationPolicy {
+ /**
+ * The [TimerScheduler] used to schedule the function terminations.
+ */
+ private val scheduler = TimerScheduler<FunctionInstance>(context, clock)
+
+ override fun enqueue(instance: FunctionInstance) {
+ // Cancel the existing timeout timer
+ scheduler.cancel(instance)
+ }
+
+ override fun onStateChanged(instance: FunctionInstance, newState: FunctionInstanceState) {
+ when (newState) {
+ FunctionInstanceState.Active -> scheduler.cancel(instance)
+ FunctionInstanceState.Idle -> schedule(instance)
+ else -> {}
+ }
+ }
+
+ /**
+ * Schedule termination for the specified [instance].
+ */
+ private fun schedule(instance: FunctionInstance) {
+ scheduler.startSingleTimer(instance, delay = timeout) { instance.close() }
+ }
+}
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/autoscaler/FunctionTerminationPolicyNull.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/autoscaler/FunctionTerminationPolicyNull.kt
new file mode 100644
index 00000000..f2d8da59
--- /dev/null
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/autoscaler/FunctionTerminationPolicyNull.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service.autoscaler
+
+import org.opendc.serverless.service.deployer.FunctionInstance
+
+/**
+ * A [FunctionTerminationPolicy] that never terminates function instances.
+ */
+public class FunctionTerminationPolicyNull : FunctionTerminationPolicy {
+ override fun enqueue(instance: FunctionInstance) {
+ // No-op
+ }
+}
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt
index 83592a68..5355b659 100644
--- a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt
@@ -39,5 +39,5 @@ public interface FunctionDeployer {
/**
* Deploy the specified [function].
*/
- public fun deploy(function: FunctionObject): FunctionInstance
+ public fun deploy(function: FunctionObject, listener: FunctionInstanceListener): FunctionInstance
}
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceListener.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceListener.kt
new file mode 100644
index 00000000..27803a63
--- /dev/null
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceListener.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service.deployer
+
+/**
+ * Listener interface for events originating from a [FunctionInstance].
+ */
+public interface FunctionInstanceListener {
+ /**
+ * This method is invoked when the state of a [FunctionInstance] has changed.
+ */
+ public fun onStateChanged(instance: FunctionInstance, newState: FunctionInstanceState) {}
+}
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceState.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceState.kt
index 44ad80ee..4fc4a83f 100644
--- a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceState.kt
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceState.kt
@@ -42,11 +42,6 @@ public enum class FunctionInstanceState {
Active,
/**
- * The function instance is stopped but can be started.
- */
- Terminated,
-
- /**
* The function instance is released and cannot be used anymore.
*/
Deleted
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt
index 515cb5fa..91a59279 100644
--- a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt
@@ -30,8 +30,11 @@ import org.opendc.serverless.api.ServerlessClient
import org.opendc.serverless.api.ServerlessFunction
import org.opendc.serverless.service.FunctionObject
import org.opendc.serverless.service.ServerlessService
+import org.opendc.serverless.service.autoscaler.FunctionTerminationPolicy
import org.opendc.serverless.service.deployer.FunctionDeployer
import org.opendc.serverless.service.deployer.FunctionInstance
+import org.opendc.serverless.service.deployer.FunctionInstanceListener
+import org.opendc.serverless.service.deployer.FunctionInstanceState
import org.opendc.serverless.service.router.RoutingPolicy
import org.opendc.utils.TimerScheduler
import java.lang.IllegalStateException
@@ -53,8 +56,9 @@ internal class ServerlessServiceImpl(
private val clock: Clock,
private val meter: Meter,
private val deployer: FunctionDeployer,
- private val routingPolicy: RoutingPolicy
-) : ServerlessService {
+ private val routingPolicy: RoutingPolicy,
+ private val terminationPolicy: FunctionTerminationPolicy
+) : ServerlessService, FunctionInstanceListener {
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
*/
@@ -220,8 +224,9 @@ internal class ServerlessServiceImpl(
activeInstance
} else {
- val instance = deployer.deploy(function)
+ val instance = deployer.deploy(function, this)
instances.add(instance)
+ terminationPolicy.enqueue(instance)
function.idleInstances.add(1)
@@ -283,6 +288,15 @@ internal class ServerlessServiceImpl(
}
}
+ override fun onStateChanged(instance: FunctionInstance, newState: FunctionInstanceState) {
+ terminationPolicy.onStateChanged(instance, newState)
+
+ if (newState == FunctionInstanceState.Deleted) {
+ val function = instance.function
+ function.instances.remove(instance)
+ }
+ }
+
/**
* A request to invoke a function.
*/
diff --git a/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt b/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt
index d9f5ee81..6b2e8223 100644
--- a/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt
+++ b/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt
@@ -45,7 +45,7 @@ internal class ServerlessServiceTest {
@Test
fun testClientState() = runBlockingSimulation {
val meter = MeterProvider.noop().get("opendc-serverless")
- val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
val client = assertDoesNotThrow { service.newClient() }
assertDoesNotThrow { client.close() }
@@ -60,7 +60,7 @@ internal class ServerlessServiceTest {
@Test
fun testClientInvokeUnknown() = runBlockingSimulation {
val meter = MeterProvider.noop().get("opendc-serverless")
- val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -70,7 +70,7 @@ internal class ServerlessServiceTest {
@Test
fun testClientFunctionCreation() = runBlockingSimulation {
val meter = MeterProvider.noop().get("opendc-serverless")
- val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -82,7 +82,7 @@ internal class ServerlessServiceTest {
@Test
fun testClientFunctionQuery() = runBlockingSimulation {
val meter = MeterProvider.noop().get("opendc-serverless")
- val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -96,7 +96,7 @@ internal class ServerlessServiceTest {
@Test
fun testClientFunctionFindById() = runBlockingSimulation {
val meter = MeterProvider.noop().get("opendc-serverless")
- val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -110,7 +110,7 @@ internal class ServerlessServiceTest {
@Test
fun testClientFunctionFindByName() = runBlockingSimulation {
val meter = MeterProvider.noop().get("opendc-serverless")
- val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -124,7 +124,7 @@ internal class ServerlessServiceTest {
@Test
fun testClientFunctionDuplicateName() = runBlockingSimulation {
val meter = MeterProvider.noop().get("opendc-serverless")
- val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -136,7 +136,7 @@ internal class ServerlessServiceTest {
@Test
fun testClientFunctionDelete() = runBlockingSimulation {
val meter = MeterProvider.noop().get("opendc-serverless")
- val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -151,7 +151,7 @@ internal class ServerlessServiceTest {
@Test
fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation {
val meter = MeterProvider.noop().get("opendc-serverless")
- val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -165,9 +165,9 @@ internal class ServerlessServiceTest {
fun testClientFunctionInvoke() = runBlockingSimulation {
val meter = MeterProvider.noop().get("opendc-serverless")
val deployer = mockk<FunctionDeployer>()
- val service = ServerlessService(coroutineContext, clock, meter, deployer, mockk())
+ val service = ServerlessService(coroutineContext, clock, meter, deployer, mockk(), mockk(relaxUnitFun = true))
- every { deployer.deploy(any()) } answers {
+ every { deployer.deploy(any(), any()) } answers {
object : FunctionInstance {
override val state: FunctionInstanceState = FunctionInstanceState.Idle
override val function: FunctionObject = it.invocation.args[0] as FunctionObject
diff --git a/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt b/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt
index 2945a279..0605eaac 100644
--- a/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt
+++ b/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt
@@ -27,6 +27,7 @@ import kotlinx.coroutines.channels.Channel
import org.opendc.serverless.service.FunctionObject
import org.opendc.serverless.service.deployer.FunctionDeployer
import org.opendc.serverless.service.deployer.FunctionInstance
+import org.opendc.serverless.service.deployer.FunctionInstanceListener
import org.opendc.serverless.service.deployer.FunctionInstanceState
import org.opendc.serverless.simulator.delay.DelayInjector
import org.opendc.serverless.simulator.workload.SimServerlessWorkloadMapper
@@ -53,8 +54,8 @@ public class SimFunctionDeployer(
private val mapper: SimServerlessWorkloadMapper
) : FunctionDeployer {
- override fun deploy(function: FunctionObject): Instance {
- val instance = Instance(function)
+ override fun deploy(function: FunctionObject, listener: FunctionInstanceListener): Instance {
+ val instance = Instance(function, listener)
instance.start()
return instance
}
@@ -62,7 +63,7 @@ public class SimFunctionDeployer(
/**
* A simulated [FunctionInstance].
*/
- public inner class Instance(override val function: FunctionObject) : FunctionInstance {
+ public inner class Instance(override val function: FunctionObject, private val listener: FunctionInstanceListener) : FunctionInstance {
/**
* The workload associated with this instance.
*/
@@ -89,6 +90,13 @@ public class SimFunctionDeployer(
private val chan = Channel<Unit>(Channel.RENDEZVOUS)
override var state: FunctionInstanceState = FunctionInstanceState.Provisioning
+ set(value) {
+ if (field != value) {
+ listener.onStateChanged(this, value)
+ }
+
+ field = value
+ }
override suspend fun invoke() {
check(state != FunctionInstanceState.Deleted) { "Function instance has been released" }
@@ -119,7 +127,7 @@ public class SimFunctionDeployer(
try {
machine.run(workload)
} finally {
- state = FunctionInstanceState.Terminated
+ state = FunctionInstanceState.Deleted
}
}
diff --git a/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt b/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt
index 9592d870..6afa1b65 100644
--- a/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt
+++ b/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.serverless.service.ServerlessService
+import org.opendc.serverless.service.autoscaler.FunctionTerminationPolicyFixed
import org.opendc.serverless.service.router.RandomRoutingPolicy
import org.opendc.serverless.simulator.delay.ZeroDelayInjector
import org.opendc.serverless.simulator.workload.SimServerlessWorkload
@@ -68,7 +69,10 @@ internal class SimServerlessServiceTest {
override suspend fun invoke() {}
})
val deployer = SimFunctionDeployer(clock, this, machineModel, ZeroDelayInjector) { workload }
- val service = ServerlessService(coroutineContext, clock, meter, deployer, RandomRoutingPolicy())
+ val service = ServerlessService(
+ coroutineContext, clock, meter, deployer, RandomRoutingPolicy(),
+ FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10000)
+ )
val client = service.newClient()