summaryrefslogtreecommitdiff
path: root/simulator/opendc-workflows/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-05 16:26:06 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-07 16:13:16 +0100
commit10f71541cd2c72e12f1b2325ee4f25e38a10e0ef (patch)
tree5cd19515be73755911cbfdff0d477532e0dee02d /simulator/opendc-workflows/src
parent249a272702bb79a901848ed4957d0992e82b3f92 (diff)
compute: Convert Server to stateful interface
This change converts the Server data class which can be used as a stateful object to control an instance running in the cloud.
Diffstat (limited to 'simulator/opendc-workflows/src')
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt19
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt12
2 files changed, 11 insertions, 20 deletions
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
index e04c8a4c..7761a793 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
@@ -25,15 +25,10 @@ package org.opendc.workflows.service
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
-import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import mu.KotlinLogging
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.ServerEvent
-import org.opendc.compute.core.ServerState
+import org.opendc.compute.core.*
import org.opendc.compute.core.virt.service.VirtProvisioningService
import org.opendc.trace.core.EventTracer
import org.opendc.trace.core.consumeAsFlow
@@ -61,7 +56,7 @@ public class StageWorkflowService(
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
taskOrderPolicy: TaskOrderPolicy
-) : WorkflowService {
+) : WorkflowService, ServerWatcher {
/**
* The logger instance to use.
*/
@@ -205,7 +200,7 @@ public class StageWorkflowService(
/**
* Indicate to the scheduler that a scheduling cycle is needed.
*/
- private suspend fun requestCycle() = mode.requestCycle()
+ private fun requestCycle() = mode.requestCycle()
/**
* Perform a scheduling cycle immediately.
@@ -279,9 +274,7 @@ public class StageWorkflowService(
instance.server = server
taskByServer[server] = instance
- server.events
- .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
- .launchIn(coroutineScope)
+ server.watch(this@StageWorkflowService)
}
activeTasks += instance
@@ -290,8 +283,8 @@ public class StageWorkflowService(
}
}
- private suspend fun stateChanged(server: Server) {
- when (server.state) {
+ public override fun onStateChanged(server: Server, newState: ServerState) {
+ when (newState) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
index d03adc61..cf8f92e0 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -24,7 +24,6 @@ package org.opendc.workflows.service
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
import org.opendc.workflows.service.stage.StagePolicy
/**
@@ -38,7 +37,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
/**
* Request a new scheduling cycle to be performed.
*/
- public suspend fun requestCycle()
+ public fun requestCycle()
}
/**
@@ -46,9 +45,8 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
*/
public object Interactive : WorkflowSchedulerMode() {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
- yield()
- scheduler.schedule()
+ override fun requestCycle() {
+ scheduler.coroutineScope.launch { scheduler.schedule() }
}
}
@@ -62,7 +60,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
private var next: kotlinx.coroutines.Job? = null
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
+ override fun requestCycle() {
if (next == null) {
// In batch mode, we assume that the scheduler runs at a fixed slot every time
// quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
@@ -88,7 +86,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
private var next: kotlinx.coroutines.Job? = null
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
+ override fun requestCycle() {
if (next == null) {
val delay = random.nextInt(200).toLong()