diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-16 20:30:17 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-28 14:28:29 +0100 |
| commit | b82e573c67f0004945aa18c575268100fb279b56 (patch) | |
| tree | 7af8e303ea9ab3821e8d8c7c1c7e9b1de058a9e7 /opendc/opendc-workflows | |
| parent | 0c19b32433e2086e72e0d22595f4daa6ef04b64b (diff) | |
refactor: Change from logical processes to simulation domains
This change moves the simulator terminology from logical processes to
simulation domains. This prevents the clash with "processes" that we are
trying to simulate.
In addition, simulation domains allows us to reduce the amount of
boilerplate and instead allows for simulation modelled using standard
techniques.
Diffstat (limited to 'opendc/opendc-workflows')
2 files changed, 19 insertions, 17 deletions
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index 48f06bcd..008cd1ee 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -24,7 +24,8 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.monitor.ServerMonitor @@ -41,13 +42,14 @@ import com.atlarge.opendc.workflows.workload.Job import java.util.PriorityQueue import java.util.Queue import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for * Datacenter Scheduling. */ class StageWorkflowService( - private val ctx: ProcessContext, + private val domain: Domain, private val provisioningService: ProvisioningService, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -167,7 +169,7 @@ class StageWorkflowService( private val resourceSelectionPolicy: Comparator<Node> init { - ctx.launch { + domain.launch { nodes = provisioningService.nodes().toList() available.addAll(nodes) } @@ -181,9 +183,9 @@ class StageWorkflowService( this.resourceSelectionPolicy = resourceSelectionPolicy(this) } - override suspend fun submit(job: Job, monitor: WorkflowMonitor) { + override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) { // J1 Incoming Jobs - val jobInstance = JobState(job, monitor, ctx.clock.millis()) + val jobInstance = JobState(job, monitor, simulationContext.clock.millis()) val instances = job.tasks.associateWith { TaskState(jobInstance, it) } @@ -230,7 +232,7 @@ class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis()) + jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis()) rootListener.jobStarted(jobInstance) } @@ -292,23 +294,23 @@ class StageWorkflowService( } } - override suspend fun onUpdate(server: Server, previousState: ServerState) { + override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) { when (server.state) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) - task.startedAt = ctx.clock.millis() - task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis()) + task.startedAt = simulationContext.clock.millis() + task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis()) rootListener.taskStarted(task) } ServerState.SHUTOFF, ServerState.ERROR -> { val task = taskByServer.remove(server) ?: throw IllegalStateException() val job = task.job task.state = TaskStatus.FINISHED - task.finishedAt = ctx.clock.millis() + task.finishedAt = simulationContext.clock.millis() job.tasks.remove(task) available += task.host!! activeTasks -= task - job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis()) + job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis()) rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -333,7 +335,7 @@ class StageWorkflowService( private suspend fun finishJob(job: JobState) { activeJobs -= job - job.monitor.onJobFinish(job.job, ctx.clock.millis()) + job.monitor.onJobFinish(job.job, simulationContext.clock.millis()) rootListener.jobFinished(job) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt index cfec93b5..776f0b07 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.workflows.service.stage.StagePolicy import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -66,13 +66,13 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { override suspend fun requestCycle() { - val ctx = processContext + val ctx = simulationContext if (next == null) { // In batch mode, we assume that the scheduler runs at a fixed slot every time // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. val delay = quantum - (ctx.clock.millis() % quantum) - val job = ctx.launch { + val job = ctx.domain.launch { delay(delay) next = null scheduler.schedule() @@ -93,11 +93,11 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { override suspend fun requestCycle() { - val ctx = processContext + val ctx = simulationContext if (next == null) { val delay = random.nextInt(200).toLong() - val job = ctx.launch { + val job = ctx.domain.launch { delay(delay) next = null scheduler.schedule() |
