diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-17 22:26:15 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-25 10:48:58 +0100 |
| commit | b1cf9b2bd9559328c3c9d26e73123e67d2bfea05 (patch) | |
| tree | 62de7a5a2b386e1467171578742dc983bd9f7c19 /opendc/opendc-workflows | |
| parent | 6b10881f123f5e6a8e7bce1045d02eba5e48c3a2 (diff) | |
refactor: Rework monitor interfaces
Diffstat (limited to 'opendc/opendc-workflows')
| -rw-r--r-- | opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt | 64 |
1 files changed, 33 insertions, 31 deletions
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index 008cd1ee..a055a3fe 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -294,42 +294,44 @@ class StageWorkflowService( } } - override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) { - when (server.state) { - ServerState.ACTIVE -> { - val task = taskByServer.getValue(server) - task.startedAt = simulationContext.clock.millis() - task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis()) - rootListener.taskStarted(task) - } - ServerState.SHUTOFF, ServerState.ERROR -> { - val task = taskByServer.remove(server) ?: throw IllegalStateException() - val job = task.job - task.state = TaskStatus.FINISHED - task.finishedAt = simulationContext.clock.millis() - job.tasks.remove(task) - available += task.host!! - activeTasks -= task - job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis()) - rootListener.taskFinished(task) - - // Add job roots to the scheduling queue - for (dependent in task.dependents) { - if (dependent.state != TaskStatus.READY) { - continue + override fun stateChanged(server: Server, previousState: ServerState) { + domain.launch { + when (server.state) { + ServerState.ACTIVE -> { + val task = taskByServer.getValue(server) + task.startedAt = simulationContext.clock.millis() + task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis()) + rootListener.taskStarted(task) + } + ServerState.SHUTOFF, ServerState.ERROR -> { + val task = taskByServer.remove(server) ?: throw IllegalStateException() + val job = task.job + task.state = TaskStatus.FINISHED + task.finishedAt = simulationContext.clock.millis() + job.tasks.remove(task) + available += task.host!! + activeTasks -= task + job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis()) + rootListener.taskFinished(task) + + // Add job roots to the scheduling queue + for (dependent in task.dependents) { + if (dependent.state != TaskStatus.READY) { + continue + } + + incomingTasks += dependent + rootListener.taskReady(dependent) } - incomingTasks += dependent - rootListener.taskReady(dependent) - } + if (job.isFinished) { + finishJob(job) + } - if (job.isFinished) { - finishJob(job) + requestCycle() } - - requestCycle() + else -> throw IllegalStateException() } - else -> throw IllegalStateException() } } |
