summaryrefslogtreecommitdiff
path: root/opendc/opendc-workflows
diff options
context:
space:
mode:
Diffstat (limited to 'opendc/opendc-workflows')
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt64
1 files changed, 33 insertions, 31 deletions
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index 008cd1ee..a055a3fe 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -294,42 +294,44 @@ class StageWorkflowService(
}
}
- override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) {
- when (server.state) {
- ServerState.ACTIVE -> {
- val task = taskByServer.getValue(server)
- task.startedAt = simulationContext.clock.millis()
- task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis())
- rootListener.taskStarted(task)
- }
- ServerState.SHUTOFF, ServerState.ERROR -> {
- val task = taskByServer.remove(server) ?: throw IllegalStateException()
- val job = task.job
- task.state = TaskStatus.FINISHED
- task.finishedAt = simulationContext.clock.millis()
- job.tasks.remove(task)
- available += task.host!!
- activeTasks -= task
- job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis())
- rootListener.taskFinished(task)
-
- // Add job roots to the scheduling queue
- for (dependent in task.dependents) {
- if (dependent.state != TaskStatus.READY) {
- continue
+ override fun stateChanged(server: Server, previousState: ServerState) {
+ domain.launch {
+ when (server.state) {
+ ServerState.ACTIVE -> {
+ val task = taskByServer.getValue(server)
+ task.startedAt = simulationContext.clock.millis()
+ task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis())
+ rootListener.taskStarted(task)
+ }
+ ServerState.SHUTOFF, ServerState.ERROR -> {
+ val task = taskByServer.remove(server) ?: throw IllegalStateException()
+ val job = task.job
+ task.state = TaskStatus.FINISHED
+ task.finishedAt = simulationContext.clock.millis()
+ job.tasks.remove(task)
+ available += task.host!!
+ activeTasks -= task
+ job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis())
+ rootListener.taskFinished(task)
+
+ // Add job roots to the scheduling queue
+ for (dependent in task.dependents) {
+ if (dependent.state != TaskStatus.READY) {
+ continue
+ }
+
+ incomingTasks += dependent
+ rootListener.taskReady(dependent)
}
- incomingTasks += dependent
- rootListener.taskReady(dependent)
- }
+ if (job.isFinished) {
+ finishJob(job)
+ }
- if (job.isFinished) {
- finishJob(job)
+ requestCycle()
}
-
- requestCycle()
+ else -> throw IllegalStateException()
}
- else -> throw IllegalStateException()
}
}