summaryrefslogtreecommitdiff
path: root/opendc/opendc-workflows
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-02-16 20:30:17 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-02-28 14:28:29 +0100
commitb82e573c67f0004945aa18c575268100fb279b56 (patch)
tree7af8e303ea9ab3821e8d8c7c1c7e9b1de058a9e7 /opendc/opendc-workflows
parent0c19b32433e2086e72e0d22595f4daa6ef04b64b (diff)
refactor: Change from logical processes to simulation domains
This change moves the simulator terminology from logical processes to simulation domains. This prevents the clash with "processes" that we are trying to simulate. In addition, simulation domains allows us to reduce the amount of boilerplate and instead allows for simulation modelled using standard techniques.
Diffstat (limited to 'opendc/opendc-workflows')
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt26
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt10
2 files changed, 19 insertions, 17 deletions
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index 48f06bcd..008cd1ee 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -24,7 +24,8 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
@@ -41,13 +42,14 @@ import com.atlarge.opendc.workflows.workload.Job
import java.util.PriorityQueue
import java.util.Queue
import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
* Datacenter Scheduling.
*/
class StageWorkflowService(
- private val ctx: ProcessContext,
+ private val domain: Domain,
private val provisioningService: ProvisioningService,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -167,7 +169,7 @@ class StageWorkflowService(
private val resourceSelectionPolicy: Comparator<Node>
init {
- ctx.launch {
+ domain.launch {
nodes = provisioningService.nodes().toList()
available.addAll(nodes)
}
@@ -181,9 +183,9 @@ class StageWorkflowService(
this.resourceSelectionPolicy = resourceSelectionPolicy(this)
}
- override suspend fun submit(job: Job, monitor: WorkflowMonitor) {
+ override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) {
// J1 Incoming Jobs
- val jobInstance = JobState(job, monitor, ctx.clock.millis())
+ val jobInstance = JobState(job, monitor, simulationContext.clock.millis())
val instances = job.tasks.associateWith {
TaskState(jobInstance, it)
}
@@ -230,7 +232,7 @@ class StageWorkflowService(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis())
+ jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis())
rootListener.jobStarted(jobInstance)
}
@@ -292,23 +294,23 @@ class StageWorkflowService(
}
}
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
+ override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) {
when (server.state) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
- task.startedAt = ctx.clock.millis()
- task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis())
+ task.startedAt = simulationContext.clock.millis()
+ task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis())
rootListener.taskStarted(task)
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val task = taskByServer.remove(server) ?: throw IllegalStateException()
val job = task.job
task.state = TaskStatus.FINISHED
- task.finishedAt = ctx.clock.millis()
+ task.finishedAt = simulationContext.clock.millis()
job.tasks.remove(task)
available += task.host!!
activeTasks -= task
- job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis())
+ job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis())
rootListener.taskFinished(task)
// Add job roots to the scheduling queue
@@ -333,7 +335,7 @@ class StageWorkflowService(
private suspend fun finishJob(job: JobState) {
activeJobs -= job
- job.monitor.onJobFinish(job.job, ctx.clock.millis())
+ job.monitor.onJobFinish(job.job, simulationContext.clock.millis())
rootListener.jobFinished(job)
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
index cfec93b5..776f0b07 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.processContext
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.workflows.service.stage.StagePolicy
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -66,13 +66,13 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = processContext
+ val ctx = simulationContext
if (next == null) {
// In batch mode, we assume that the scheduler runs at a fixed slot every time
// quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
val delay = quantum - (ctx.clock.millis() % quantum)
- val job = ctx.launch {
+ val job = ctx.domain.launch {
delay(delay)
next = null
scheduler.schedule()
@@ -93,11 +93,11 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = processContext
+ val ctx = simulationContext
if (next == null) {
val delay = random.nextInt(200).toLong()
- val job = ctx.launch {
+ val job = ctx.domain.launch {
delay(delay)
next = null
scheduler.schedule()