diff options
Diffstat (limited to 'opendc/opendc-workflows')
2 files changed, 19 insertions, 17 deletions
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index 48f06bcd..008cd1ee 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -24,7 +24,8 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.monitor.ServerMonitor @@ -41,13 +42,14 @@ import com.atlarge.opendc.workflows.workload.Job import java.util.PriorityQueue import java.util.Queue import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for * Datacenter Scheduling. */ class StageWorkflowService( - private val ctx: ProcessContext, + private val domain: Domain, private val provisioningService: ProvisioningService, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -167,7 +169,7 @@ class StageWorkflowService( private val resourceSelectionPolicy: Comparator<Node> init { - ctx.launch { + domain.launch { nodes = provisioningService.nodes().toList() available.addAll(nodes) } @@ -181,9 +183,9 @@ class StageWorkflowService( this.resourceSelectionPolicy = resourceSelectionPolicy(this) } - override suspend fun submit(job: Job, monitor: WorkflowMonitor) { + override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) { // J1 Incoming Jobs - val jobInstance = JobState(job, monitor, ctx.clock.millis()) + val jobInstance = JobState(job, monitor, simulationContext.clock.millis()) val instances = job.tasks.associateWith { TaskState(jobInstance, it) } @@ -230,7 +232,7 @@ class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis()) + jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis()) rootListener.jobStarted(jobInstance) } @@ -292,23 +294,23 @@ class StageWorkflowService( } } - override suspend fun onUpdate(server: Server, previousState: ServerState) { + override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) { when (server.state) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) - task.startedAt = ctx.clock.millis() - task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis()) + task.startedAt = simulationContext.clock.millis() + task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis()) rootListener.taskStarted(task) } ServerState.SHUTOFF, ServerState.ERROR -> { val task = taskByServer.remove(server) ?: throw IllegalStateException() val job = task.job task.state = TaskStatus.FINISHED - task.finishedAt = ctx.clock.millis() + task.finishedAt = simulationContext.clock.millis() job.tasks.remove(task) available += task.host!! activeTasks -= task - job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis()) + job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis()) rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -333,7 +335,7 @@ class StageWorkflowService( private suspend fun finishJob(job: JobState) { activeJobs -= job - job.monitor.onJobFinish(job.job, ctx.clock.millis()) + job.monitor.onJobFinish(job.job, simulationContext.clock.millis()) rootListener.jobFinished(job) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt index cfec93b5..776f0b07 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.workflows.service.stage.StagePolicy import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -66,13 +66,13 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { override suspend fun requestCycle() { - val ctx = processContext + val ctx = simulationContext if (next == null) { // In batch mode, we assume that the scheduler runs at a fixed slot every time // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. val delay = quantum - (ctx.clock.millis() % quantum) - val job = ctx.launch { + val job = ctx.domain.launch { delay(delay) next = null scheduler.schedule() @@ -93,11 +93,11 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { override suspend fun requestCycle() { - val ctx = processContext + val ctx = simulationContext if (next == null) { val delay = random.nextInt(200).toLong() - val job = ctx.launch { + val job = ctx.domain.launch { delay(delay) next = null scheduler.schedule() |
