diff options
Diffstat (limited to 'opendc-faas')
20 files changed, 274 insertions, 228 deletions
diff --git a/opendc-faas/opendc-faas-api/build.gradle.kts b/opendc-faas/opendc-faas-api/build.gradle.kts index 8a295acd..2e4b5776 100644 --- a/opendc-faas/opendc-faas-api/build.gradle.kts +++ b/opendc-faas/opendc-faas-api/build.gradle.kts @@ -22,7 +22,7 @@ description = "API for the OpenDC FaaS platform" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-faas/opendc-faas-api/src/main/kotlin/org/opendc/faas/api/FaaSClient.kt b/opendc-faas/opendc-faas-api/src/main/kotlin/org/opendc/faas/api/FaaSClient.kt index d3abb7f1..297d3065 100644 --- a/opendc-faas/opendc-faas-api/src/main/kotlin/org/opendc/faas/api/FaaSClient.kt +++ b/opendc-faas/opendc-faas-api/src/main/kotlin/org/opendc/faas/api/FaaSClient.kt @@ -59,7 +59,7 @@ public interface FaaSClient : AutoCloseable { name: String, memorySize: Long, labels: Map<String, String> = emptyMap(), - meta: Map<String, Any> = emptyMap() + meta: Map<String, Any> = emptyMap(), ): FaaSFunction /** diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts index 8b371998..90cb8f56 100644 --- a/opendc-faas/opendc-faas-service/build.gradle.kts +++ b/opendc-faas/opendc-faas-service/build.gradle.kts @@ -22,7 +22,7 @@ description = "FaaS service for OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index 96619cdb..e9634ccc 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -72,7 +72,7 @@ public interface FaaSService : AutoCloseable { deployer: FunctionDeployer, routingPolicy: RoutingPolicy, terminationPolicy: FunctionTerminationPolicy, - quantum: Duration = Duration.ofMillis(100) + quantum: Duration = Duration.ofMillis(100), ): FaaSService { return FaaSServiceImpl(dispatcher, deployer, routingPolicy, terminationPolicy, quantum) } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index 091e82a8..0ed96b96 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -35,21 +35,23 @@ public class FunctionObject( name: String, allocatedMemory: Long, labels: Map<String, String>, - meta: Map<String, Any> + meta: Map<String, Any>, ) : AutoCloseable { /** * Metrics tracked per function. */ - private var _invocations = 0L - private var _timelyInvocations = 0L - private var _delayedInvocations = 0L - private var _failedInvocations = 0L - private var _activeInstances = 0 - private var _idleInstances = 0 - private val _waitTime = DescriptiveStatistics() - .apply { windowSize = 100 } - private val _activeTime = DescriptiveStatistics() - .apply { windowSize = 100 } + private var localInvocations = 0L + private var localTimelyInvocations = 0L + private var localDelayedInvocations = 0L + private var localFailedInvocations = 0L + private var localActiveInstances = 0 + private var localIdleInstances = 0 + private val localWaitTime = + DescriptiveStatistics() + .apply { windowSize = 100 } + private val localActiveTime = + DescriptiveStatistics() + .apply { windowSize = 100 } /** * The instances associated with this function. @@ -70,7 +72,7 @@ public class FunctionObject( * Report a scheduled invocation. */ internal fun reportSubmission() { - _invocations++ + localInvocations++ } /** @@ -78,38 +80,41 @@ public class FunctionObject( */ internal fun reportDeployment(isDelayed: Boolean) { if (isDelayed) { - _delayedInvocations++ - _idleInstances++ + localDelayedInvocations++ + localIdleInstances++ } else { - _timelyInvocations++ + localTimelyInvocations++ } } /** * Report the start of a function invocation. */ - internal fun reportStart(start: Long, submitTime: Long) { + internal fun reportStart( + start: Long, + submitTime: Long, + ) { val wait = start - submitTime - _waitTime.addValue(wait.toDouble()) + localWaitTime.addValue(wait.toDouble()) - _idleInstances-- - _activeInstances++ + localIdleInstances-- + localActiveInstances++ } /** * Report the failure of a function invocation. */ internal fun reportFailure() { - _failedInvocations++ + localFailedInvocations++ } /** * Report the end of a function invocation. */ internal fun reportEnd(duration: Long) { - _activeTime.addValue(duration.toDouble()) - _idleInstances++ - _activeInstances-- + localActiveTime.addValue(duration.toDouble()) + localIdleInstances++ + localActiveInstances-- } /** @@ -117,14 +122,14 @@ public class FunctionObject( */ internal fun getStats(): FunctionStats { return FunctionStats( - _invocations, - _timelyInvocations, - _delayedInvocations, - _failedInvocations, - _activeInstances, - _idleInstances, - _waitTime.copy(), - _activeTime.copy() + localInvocations, + localTimelyInvocations, + localDelayedInvocations, + localFailedInvocations, + localActiveInstances, + localIdleInstances, + localWaitTime.copy(), + localActiveTime.copy(), ) } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt index a2c371e1..9edb8c1d 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt @@ -35,7 +35,7 @@ import java.time.Duration */ public class FunctionTerminationPolicyFixed( dispatcher: Dispatcher, - public val timeout: Duration + public val timeout: Duration, ) : FunctionTerminationPolicy { /** * The [TimerScheduler] used to schedule the function terminations. @@ -47,7 +47,10 @@ public class FunctionTerminationPolicyFixed( scheduler.cancel(instance) } - override fun onStateChanged(instance: FunctionInstance, newState: FunctionInstanceState) { + override fun onStateChanged( + instance: FunctionInstance, + newState: FunctionInstanceState, + ) { when (newState) { FunctionInstanceState.Active -> scheduler.cancel(instance) FunctionInstanceState.Idle -> schedule(instance) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt index 18d16d06..13d48fbf 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt @@ -39,5 +39,8 @@ public interface FunctionDeployer { /** * Deploy the specified [function]. */ - public fun deploy(function: FunctionObject, listener: FunctionInstanceListener): FunctionInstance + public fun deploy( + function: FunctionObject, + listener: FunctionInstanceListener, + ): FunctionInstance } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceListener.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceListener.kt index 20e280a2..e88b7104 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceListener.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceListener.kt @@ -29,5 +29,8 @@ public interface FunctionInstanceListener { /** * This method is invoked when the state of a [FunctionInstance] has changed. */ - public fun onStateChanged(instance: FunctionInstance, newState: FunctionInstanceState) {} + public fun onStateChanged( + instance: FunctionInstance, + newState: FunctionInstanceState, + ) {} } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceState.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceState.kt index 2b6b6eba..0c310e6b 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceState.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceState.kt @@ -44,5 +44,5 @@ public enum class FunctionInstanceState { /** * The function instance is released and cannot be used anymore. */ - Deleted + Deleted, } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSFunctionImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSFunctionImpl.kt index 36532aa8..7cc85e40 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSFunctionImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSFunctionImpl.kt @@ -31,7 +31,7 @@ import java.util.UUID */ internal class FaaSFunctionImpl( private val service: FaaSServiceImpl, - private val state: FunctionObject + private val state: FunctionObject, ) : FaaSFunction { override val uid: UUID = state.uid diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index b1e6b3f5..397b0e7d 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -61,7 +61,7 @@ internal class FaaSServiceImpl( private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, private val terminationPolicy: FunctionTerminationPolicy, - quantum: Duration + quantum: Duration, ) : FaaSService, FunctionInstanceListener { /** * The logger instance of this server. @@ -134,19 +134,20 @@ internal class FaaSServiceImpl( name: String, memorySize: Long, labels: Map<String, String>, - meta: Map<String, Any> + meta: Map<String, Any>, ): FaaSFunction { check(!isClosed) { "Client is already closed" } require(name !in functionsByName) { "Function with same name exists" } val uid = UUID(clock.millis(), random.nextLong()) - val function = FunctionObject( - uid, - name, - memorySize, - labels, - meta - ) + val function = + FunctionObject( + uid, + name, + memorySize, + labels, + meta, + ) functionsByName[name] = function functions[uid] = function @@ -200,27 +201,29 @@ internal class FaaSServiceImpl( val instances = function.instances // Check if there exists an instance of the function - val activeInstance = if (instances.isNotEmpty()) { - routingPolicy.select(instances, function) - } else { - null - } + val activeInstance = + if (instances.isNotEmpty()) { + routingPolicy.select(instances, function) + } else { + null + } - val instance = if (activeInstance != null) { - timelyInvocations++ - function.reportDeployment(isDelayed = false) + val instance = + if (activeInstance != null) { + timelyInvocations++ + function.reportDeployment(isDelayed = false) - activeInstance - } else { - val instance = deployer.deploy(function, this) - instances.add(instance) - terminationPolicy.enqueue(instance) + activeInstance + } else { + val instance = deployer.deploy(function, this) + instances.add(instance) + terminationPolicy.enqueue(instance) - delayedInvocations++ - function.reportDeployment(isDelayed = true) + delayedInvocations++ + function.reportDeployment(isDelayed = true) - instance - } + instance + } suspend { val start = clock.millis() @@ -268,7 +271,10 @@ internal class FaaSServiceImpl( } } - override fun onStateChanged(instance: FunctionInstance, newState: FunctionInstanceState) { + override fun onStateChanged( + instance: FunctionInstance, + newState: FunctionInstanceState, + ) { terminationPolicy.onStateChanged(instance, newState) if (newState == FunctionInstanceState.Deleted) { diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt index 22bf7266..1eb03e5a 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt @@ -31,7 +31,10 @@ import java.util.random.RandomGenerator * A [RoutingPolicy] that selects a random function instance. */ public class RandomRoutingPolicy(private val random: RandomGenerator = SplittableRandom(0)) : RoutingPolicy { - override fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance { + override fun select( + instances: List<FunctionInstance>, + function: FunctionObject, + ): FunctionInstance { val idx = random.nextInt(instances.size) return instances.elementAt(idx) } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RoutingPolicy.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RoutingPolicy.kt index e99e329a..c8ea37fc 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RoutingPolicy.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RoutingPolicy.kt @@ -32,5 +32,8 @@ public interface RoutingPolicy { /** * Select the instance to which the request should be routed to. */ - public fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance? + public fun select( + instances: List<FunctionInstance>, + function: FunctionObject, + ): FunctionInstance? } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt index 497ee423..db6db6c1 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt @@ -44,5 +44,5 @@ public data class FunctionStats( val activeInstances: Int, val idleInstances: Int, val waitTime: DescriptiveStatistics, - val activeTime: DescriptiveStatistics + val activeTime: DescriptiveStatistics, ) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt index cabb1d56..b65dfb03 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt @@ -32,5 +32,5 @@ package org.opendc.faas.service.telemetry public data class SchedulerStats( val totalInvocations: Long, val timelyInvocations: Long, - val delayedInvocations: Long + val delayedInvocations: Long, ) diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt index 9676744b..72a5f2c8 100644 --- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt @@ -41,136 +41,145 @@ import java.util.UUID * Test suite for the [FaaSService] implementation. */ internal class FaaSServiceTest { - @Test - fun testClientState() = runSimulation { - val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) - - val client = assertDoesNotThrow { service.newClient() } - assertDoesNotThrow { client.close() } - - assertThrows<IllegalStateException> { client.queryFunctions() } - assertThrows<IllegalStateException> { client.newFunction("test", 128) } - assertThrows<IllegalStateException> { client.invoke("test") } - assertThrows<IllegalStateException> { client.findFunction(UUID.randomUUID()) } - assertThrows<IllegalStateException> { client.findFunction("name") } - } + fun testClientState() = + runSimulation { + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) + + val client = assertDoesNotThrow { service.newClient() } + assertDoesNotThrow { client.close() } + + assertThrows<IllegalStateException> { client.queryFunctions() } + assertThrows<IllegalStateException> { client.newFunction("test", 128) } + assertThrows<IllegalStateException> { client.invoke("test") } + assertThrows<IllegalStateException> { client.findFunction(UUID.randomUUID()) } + assertThrows<IllegalStateException> { client.findFunction("name") } + } @Test - fun testClientInvokeUnknown() = runSimulation { - val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) + fun testClientInvokeUnknown() = + runSimulation { + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) - val client = service.newClient() + val client = service.newClient() - assertThrows<IllegalArgumentException> { client.invoke("test") } - } + assertThrows<IllegalArgumentException> { client.invoke("test") } + } @Test - fun testClientFunctionCreation() = runSimulation { - val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) + fun testClientFunctionCreation() = + runSimulation { + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) - val client = service.newClient() + val client = service.newClient() - val function = client.newFunction("test", 128) + val function = client.newFunction("test", 128) - assertEquals("test", function.name) - } + assertEquals("test", function.name) + } @Test - fun testClientFunctionQuery() = runSimulation { - val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) + fun testClientFunctionQuery() = + runSimulation { + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) - val client = service.newClient() + val client = service.newClient() - assertEquals(emptyList<FaaSFunction>(), client.queryFunctions()) + assertEquals(emptyList<FaaSFunction>(), client.queryFunctions()) - val function = client.newFunction("test", 128) + val function = client.newFunction("test", 128) - assertEquals(listOf(function), client.queryFunctions()) - } + assertEquals(listOf(function), client.queryFunctions()) + } @Test - fun testClientFunctionFindById() = runSimulation { - val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) + fun testClientFunctionFindById() = + runSimulation { + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) - val client = service.newClient() + val client = service.newClient() - assertEquals(emptyList<FaaSFunction>(), client.queryFunctions()) + assertEquals(emptyList<FaaSFunction>(), client.queryFunctions()) - val function = client.newFunction("test", 128) + val function = client.newFunction("test", 128) - assertNotNull(client.findFunction(function.uid)) - } + assertNotNull(client.findFunction(function.uid)) + } @Test - fun testClientFunctionFindByName() = runSimulation { - val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) + fun testClientFunctionFindByName() = + runSimulation { + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) - val client = service.newClient() + val client = service.newClient() - assertEquals(emptyList<FaaSFunction>(), client.queryFunctions()) + assertEquals(emptyList<FaaSFunction>(), client.queryFunctions()) - val function = client.newFunction("test", 128) + val function = client.newFunction("test", 128) - assertNotNull(client.findFunction(function.name)) - } + assertNotNull(client.findFunction(function.name)) + } @Test - fun testClientFunctionDuplicateName() = runSimulation { - val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) + fun testClientFunctionDuplicateName() = + runSimulation { + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) - val client = service.newClient() + val client = service.newClient() - client.newFunction("test", 128) + client.newFunction("test", 128) - assertThrows<IllegalArgumentException> { client.newFunction("test", 128) } - } + assertThrows<IllegalArgumentException> { client.newFunction("test", 128) } + } @Test - fun testClientFunctionDelete() = runSimulation { - val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) - - val client = service.newClient() - val function = client.newFunction("test", 128) - assertNotNull(client.findFunction(function.uid)) - function.delete() - assertNull(client.findFunction(function.uid)) - - // Delete should be idempotent - function.delete() - } + fun testClientFunctionDelete() = + runSimulation { + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) + + val client = service.newClient() + val function = client.newFunction("test", 128) + assertNotNull(client.findFunction(function.uid)) + function.delete() + assertNull(client.findFunction(function.uid)) + + // Delete should be idempotent + function.delete() + } @Test - fun testClientFunctionCannotInvokeDeleted() = runSimulation { - val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) + fun testClientFunctionCannotInvokeDeleted() = + runSimulation { + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) - val client = service.newClient() - val function = client.newFunction("test", 128) - assertNotNull(client.findFunction(function.uid)) - function.delete() + val client = service.newClient() + val function = client.newFunction("test", 128) + assertNotNull(client.findFunction(function.uid)) + function.delete() - assertThrows<IllegalStateException> { function.invoke() } - } + assertThrows<IllegalStateException> { function.invoke() } + } @Test - fun testClientFunctionInvoke() = runSimulation { - val deployer = mockk<FunctionDeployer>() - val service = FaaSService(dispatcher, deployer, mockk(), mockk(relaxUnitFun = true)) + fun testClientFunctionInvoke() = + runSimulation { + val deployer = mockk<FunctionDeployer>() + val service = FaaSService(dispatcher, deployer, mockk(), mockk(relaxUnitFun = true)) - every { deployer.deploy(any(), any()) } answers { - object : FunctionInstance { - override val state: FunctionInstanceState = FunctionInstanceState.Idle - override val function: FunctionObject = it.invocation.args[0] as FunctionObject + every { deployer.deploy(any(), any()) } answers { + object : FunctionInstance { + override val state: FunctionInstanceState = FunctionInstanceState.Idle + override val function: FunctionObject = it.invocation.args[0] as FunctionObject - override suspend fun invoke() {} + override suspend fun invoke() {} - override fun close() {} + override fun close() {} + } } - } - val client = service.newClient() - val function = client.newFunction("test", 128) + val client = service.newClient() + val function = client.newFunction("test", 128) - function.invoke() - } + function.invoke() + } } diff --git a/opendc-faas/opendc-faas-simulator/build.gradle.kts b/opendc-faas/opendc-faas-simulator/build.gradle.kts index 5f8c8667..20374324 100644 --- a/opendc-faas/opendc-faas-simulator/build.gradle.kts +++ b/opendc-faas/opendc-faas-simulator/build.gradle.kts @@ -22,7 +22,7 @@ description = "Simulator for the OpenDC FaaS platform" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt index 47b4d4fa..c81dc523 100644 --- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt +++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt @@ -58,14 +58,17 @@ public class SimFunctionDeployer( private val dispatcher: Dispatcher, private val model: MachineModel, private val delayInjector: DelayInjector, - private val mapper: SimFaaSWorkloadMapper = SimMetaFaaSWorkloadMapper() + private val mapper: SimFaaSWorkloadMapper = SimMetaFaaSWorkloadMapper(), ) : FunctionDeployer, AutoCloseable { /** * The [CoroutineScope] of this deployer. */ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher() + Job()) - override fun deploy(function: FunctionObject, listener: FunctionInstanceListener): Instance { + override fun deploy( + function: FunctionObject, + listener: FunctionInstanceListener, + ): Instance { val instance = Instance(function, listener) instance.start() return instance @@ -84,10 +87,11 @@ public class SimFunctionDeployer( /** * The machine that will execute the workloads. */ - public val machine: SimMachine = SimBareMetalMachine.create( - FlowEngine.create(dispatcher).newGraph(), - model - ) + public val machine: SimMachine = + SimBareMetalMachine.create( + FlowEngine.create(dispatcher).newGraph(), + model, + ) /** * The job associated with the lifecycle of the instance. @@ -134,38 +138,39 @@ public class SimFunctionDeployer( */ internal fun start() { check(state == FunctionInstanceState.Provisioning) { "Invalid state of function instance" } - job = scope.launch { - delay(delayInjector.getColdStartDelay(this@Instance)) - - launch { - try { - machine.runWorkload(workload) - } finally { - state = FunctionInstanceState.Deleted - } - } + job = + scope.launch { + delay(delayInjector.getColdStartDelay(this@Instance)) - while (isActive) { - if (queue.isEmpty()) { - chan.receive() + launch { + try { + machine.runWorkload(workload) + } finally { + state = FunctionInstanceState.Deleted + } } - state = FunctionInstanceState.Active - while (queue.isNotEmpty()) { - val request = queue.poll() - try { - workload.invoke() - request.cont.resume(Unit) - } catch (cause: CancellationException) { - request.cont.resumeWithException(cause) - throw cause - } catch (cause: Throwable) { - request.cont.resumeWithException(cause) + while (isActive) { + if (queue.isEmpty()) { + chan.receive() + } + + state = FunctionInstanceState.Active + while (queue.isNotEmpty()) { + val request = queue.poll() + try { + workload.invoke() + request.cont.resume(Unit) + } catch (cause: CancellationException) { + request.cont.resumeWithException(cause) + throw cause + } catch (cause: Throwable) { + request.cont.resumeWithException(cause) + } } + state = FunctionInstanceState.Idle } - state = FunctionInstanceState.Idle } - } } /** diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/ColdStartModel.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/ColdStartModel.kt index 624067be..f5035ca2 100644 --- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/ColdStartModel.kt +++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/delay/ColdStartModel.kt @@ -60,7 +60,7 @@ public enum class ColdStartModel { else -> Pair(0.0, 1.0) } } - }; + }, ; /** * Obtain the stochastic parameters for the cold start models. diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index ee9114b5..f68860e3 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -50,57 +50,63 @@ import java.util.Random * A test suite for the [FaaSService] implementation under simulated conditions. */ internal class SimFaaSServiceTest { - private lateinit var machineModel: MachineModel @BeforeEach fun setUp() { val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) - machineModel = MachineModel( - /*cpus*/ List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, - /*memory*/ List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } - ) + machineModel = + MachineModel( + List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }, + ) } @Test - fun testSmoke() = runSimulation { - val random = Random(0) - val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimWorkloads.runtime(1000, 1.0) { - override suspend fun invoke() { - delay(random.nextInt(1000).toLong()) - } - }) + fun testSmoke() = + runSimulation { + val random = Random(0) + val workload = + spyk( + object : SimFaaSWorkload, SimWorkload by SimWorkloads.runtime(1000, 1.0) { + override suspend fun invoke() { + delay(random.nextInt(1000).toLong()) + } + }, + ) - val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random) - val deployer = SimFunctionDeployer(dispatcher, machineModel, delayInjector) { workload } - val service = FaaSService( - dispatcher, - deployer, - RandomRoutingPolicy(), - FunctionTerminationPolicyFixed(dispatcher, timeout = Duration.ofMillis(10000)) - ) + val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random) + val deployer = SimFunctionDeployer(dispatcher, machineModel, delayInjector) { workload } + val service = + FaaSService( + dispatcher, + deployer, + RandomRoutingPolicy(), + FunctionTerminationPolicyFixed(dispatcher, timeout = Duration.ofMillis(10000)), + ) - val client = service.newClient() + val client = service.newClient() - val function = client.newFunction("test", 128) - function.invoke() - delay(2000) + val function = client.newFunction("test", 128) + function.invoke() + delay(2000) - service.close() - deployer.close() + service.close() + deployer.close() - yield() + yield() - val funcStats = service.getFunctionStats(function) + val funcStats = service.getFunctionStats(function) - assertAll( - { coVerify { workload.invoke() } }, - { assertEquals(1, funcStats.totalInvocations) }, - { assertEquals(1, funcStats.delayedInvocations) }, - { assertEquals(0, funcStats.failedInvocations) }, - { assertEquals(0.0, funcStats.waitTime.mean) }, // fixme: this is probably wrong, and should be 100 - { assertEquals(1285.0, funcStats.activeTime.mean) } - ) - } + // fixme: waitTime is probably wrong, and should be 100 + assertAll( + { coVerify { workload.invoke() } }, + { assertEquals(1, funcStats.totalInvocations) }, + { assertEquals(1, funcStats.delayedInvocations) }, + { assertEquals(0, funcStats.failedInvocations) }, + { assertEquals(0.0, funcStats.waitTime.mean) }, + { assertEquals(1285.0, funcStats.activeTime.mean) }, + ) + } } |
