diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-05 16:26:06 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-07 16:13:16 +0100 |
| commit | 10f71541cd2c72e12f1b2325ee4f25e38a10e0ef (patch) | |
| tree | 5cd19515be73755911cbfdff0d477532e0dee02d /simulator/opendc-workflows | |
| parent | 249a272702bb79a901848ed4957d0992e82b3f92 (diff) | |
compute: Convert Server to stateful interface
This change converts the Server data class which can be used as a
stateful object to control an instance running in the cloud.
Diffstat (limited to 'simulator/opendc-workflows')
2 files changed, 11 insertions, 20 deletions
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt index e04c8a4c..7761a793 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt @@ -25,15 +25,10 @@ package org.opendc.workflows.service import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState +import org.opendc.compute.core.* import org.opendc.compute.core.virt.service.VirtProvisioningService import org.opendc.trace.core.EventTracer import org.opendc.trace.core.consumeAsFlow @@ -61,7 +56,7 @@ public class StageWorkflowService( jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, taskOrderPolicy: TaskOrderPolicy -) : WorkflowService { +) : WorkflowService, ServerWatcher { /** * The logger instance to use. */ @@ -205,7 +200,7 @@ public class StageWorkflowService( /** * Indicate to the scheduler that a scheduling cycle is needed. */ - private suspend fun requestCycle() = mode.requestCycle() + private fun requestCycle() = mode.requestCycle() /** * Perform a scheduling cycle immediately. @@ -279,9 +274,7 @@ public class StageWorkflowService( instance.server = server taskByServer[server] = instance - server.events - .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } - .launchIn(coroutineScope) + server.watch(this@StageWorkflowService) } activeTasks += instance @@ -290,8 +283,8 @@ public class StageWorkflowService( } } - private suspend fun stateChanged(server: Server) { - when (server.state) { + public override fun onStateChanged(server: Server, newState: ServerState) { + when (newState) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt index d03adc61..cf8f92e0 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -24,7 +24,6 @@ package org.opendc.workflows.service import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import kotlinx.coroutines.yield import org.opendc.workflows.service.stage.StagePolicy /** @@ -38,7 +37,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo /** * Request a new scheduling cycle to be performed. */ - public suspend fun requestCycle() + public fun requestCycle() } /** @@ -46,9 +45,8 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo */ public object Interactive : WorkflowSchedulerMode() { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { - override suspend fun requestCycle() { - yield() - scheduler.schedule() + override fun requestCycle() { + scheduler.coroutineScope.launch { scheduler.schedule() } } } @@ -62,7 +60,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo private var next: kotlinx.coroutines.Job? = null override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { - override suspend fun requestCycle() { + override fun requestCycle() { if (next == null) { // In batch mode, we assume that the scheduler runs at a fixed slot every time // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. @@ -88,7 +86,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo private var next: kotlinx.coroutines.Job? = null override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { - override suspend fun requestCycle() { + override fun requestCycle() { if (next == null) { val delay = random.nextInt(200).toLong() |
