summaryrefslogtreecommitdiff
path: root/opendc/opendc-workflows
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-17 22:26:15 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-25 10:48:58 +0100
commitb1cf9b2bd9559328c3c9d26e73123e67d2bfea05 (patch)
tree62de7a5a2b386e1467171578742dc983bd9f7c19 /opendc/opendc-workflows
parent6b10881f123f5e6a8e7bce1045d02eba5e48c3a2 (diff)
refactor: Rework monitor interfaces
Diffstat (limited to 'opendc/opendc-workflows')
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt64
1 files changed, 33 insertions, 31 deletions
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index 008cd1ee..a055a3fe 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -294,42 +294,44 @@ class StageWorkflowService(
}
}
- override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) {
- when (server.state) {
- ServerState.ACTIVE -> {
- val task = taskByServer.getValue(server)
- task.startedAt = simulationContext.clock.millis()
- task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis())
- rootListener.taskStarted(task)
- }
- ServerState.SHUTOFF, ServerState.ERROR -> {
- val task = taskByServer.remove(server) ?: throw IllegalStateException()
- val job = task.job
- task.state = TaskStatus.FINISHED
- task.finishedAt = simulationContext.clock.millis()
- job.tasks.remove(task)
- available += task.host!!
- activeTasks -= task
- job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis())
- rootListener.taskFinished(task)
-
- // Add job roots to the scheduling queue
- for (dependent in task.dependents) {
- if (dependent.state != TaskStatus.READY) {
- continue
+ override fun stateChanged(server: Server, previousState: ServerState) {
+ domain.launch {
+ when (server.state) {
+ ServerState.ACTIVE -> {
+ val task = taskByServer.getValue(server)
+ task.startedAt = simulationContext.clock.millis()
+ task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis())
+ rootListener.taskStarted(task)
+ }
+ ServerState.SHUTOFF, ServerState.ERROR -> {
+ val task = taskByServer.remove(server) ?: throw IllegalStateException()
+ val job = task.job
+ task.state = TaskStatus.FINISHED
+ task.finishedAt = simulationContext.clock.millis()
+ job.tasks.remove(task)
+ available += task.host!!
+ activeTasks -= task
+ job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis())
+ rootListener.taskFinished(task)
+
+ // Add job roots to the scheduling queue
+ for (dependent in task.dependents) {
+ if (dependent.state != TaskStatus.READY) {
+ continue
+ }
+
+ incomingTasks += dependent
+ rootListener.taskReady(dependent)
}
- incomingTasks += dependent
- rootListener.taskReady(dependent)
- }
+ if (job.isFinished) {
+ finishJob(job)
+ }
- if (job.isFinished) {
- finishJob(job)
+ requestCycle()
}
-
- requestCycle()
+ else -> throw IllegalStateException()
}
- else -> throw IllegalStateException()
}
}