summaryrefslogtreecommitdiff
path: root/opendc/opendc-workflows
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-02-28 14:34:45 +0100
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-02-28 14:34:45 +0100
commit3a5eac673fb67a6cff7fc79f16312db78d706322 (patch)
tree6c0e37e994d0a1ada6cef8d42d7dfbd9cdde3ccc /opendc/opendc-workflows
parent0c19b32433e2086e72e0d22595f4daa6ef04b64b (diff)
parent2ed1e47b5d82229a873febebb2d8bd3d8f5832ea (diff)
Merge branch 'refactor/domains' into 'feat/2.x'
Change from logical processes to simulation domains See merge request opendc/opendc-simulator!28
Diffstat (limited to 'opendc/opendc-workflows')
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt26
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt10
2 files changed, 19 insertions, 17 deletions
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index 48f06bcd..008cd1ee 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -24,7 +24,8 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
@@ -41,13 +42,14 @@ import com.atlarge.opendc.workflows.workload.Job
import java.util.PriorityQueue
import java.util.Queue
import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
* Datacenter Scheduling.
*/
class StageWorkflowService(
- private val ctx: ProcessContext,
+ private val domain: Domain,
private val provisioningService: ProvisioningService,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -167,7 +169,7 @@ class StageWorkflowService(
private val resourceSelectionPolicy: Comparator<Node>
init {
- ctx.launch {
+ domain.launch {
nodes = provisioningService.nodes().toList()
available.addAll(nodes)
}
@@ -181,9 +183,9 @@ class StageWorkflowService(
this.resourceSelectionPolicy = resourceSelectionPolicy(this)
}
- override suspend fun submit(job: Job, monitor: WorkflowMonitor) {
+ override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) {
// J1 Incoming Jobs
- val jobInstance = JobState(job, monitor, ctx.clock.millis())
+ val jobInstance = JobState(job, monitor, simulationContext.clock.millis())
val instances = job.tasks.associateWith {
TaskState(jobInstance, it)
}
@@ -230,7 +232,7 @@ class StageWorkflowService(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis())
+ jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis())
rootListener.jobStarted(jobInstance)
}
@@ -292,23 +294,23 @@ class StageWorkflowService(
}
}
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
+ override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) {
when (server.state) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
- task.startedAt = ctx.clock.millis()
- task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis())
+ task.startedAt = simulationContext.clock.millis()
+ task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis())
rootListener.taskStarted(task)
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val task = taskByServer.remove(server) ?: throw IllegalStateException()
val job = task.job
task.state = TaskStatus.FINISHED
- task.finishedAt = ctx.clock.millis()
+ task.finishedAt = simulationContext.clock.millis()
job.tasks.remove(task)
available += task.host!!
activeTasks -= task
- job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis())
+ job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis())
rootListener.taskFinished(task)
// Add job roots to the scheduling queue
@@ -333,7 +335,7 @@ class StageWorkflowService(
private suspend fun finishJob(job: JobState) {
activeJobs -= job
- job.monitor.onJobFinish(job.job, ctx.clock.millis())
+ job.monitor.onJobFinish(job.job, simulationContext.clock.millis())
rootListener.jobFinished(job)
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
index cfec93b5..776f0b07 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.processContext
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.workflows.service.stage.StagePolicy
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -66,13 +66,13 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = processContext
+ val ctx = simulationContext
if (next == null) {
// In batch mode, we assume that the scheduler runs at a fixed slot every time
// quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
val delay = quantum - (ctx.clock.millis() % quantum)
- val job = ctx.launch {
+ val job = ctx.domain.launch {
delay(delay)
next = null
scheduler.schedule()
@@ -93,11 +93,11 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = processContext
+ val ctx = simulationContext
if (next == null) {
val delay = random.nextInt(200).toLong()
- val job = ctx.launch {
+ val job = ctx.domain.launch {
delay(delay)
next = null
scheduler.schedule()