summaryrefslogtreecommitdiff
path: root/simulator/opendc/opendc-workflows/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-30 21:14:20 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-30 23:40:57 +0200
commitc41d201343263346ac84855a0b2254051ed33c21 (patch)
tree9141a382f9e1b2d924e9a191e53cc6daa9107563 /simulator/opendc/opendc-workflows/src
parentc543f55e961f9f7468e19c1c0f5f20566d07dfb5 (diff)
Eliminate use of Domain and simulationContext in OpenDC
This change takes the first step in eliminating the explict use of Domain and simulationContext from OpenDC. In this way, we decouple the logic of various datacenter services from simulation logic, which should promote re-use.
Diffstat (limited to 'simulator/opendc/opendc-workflows/src')
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt33
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt9
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt2
-rw-r--r--simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt8
4 files changed, 25 insertions, 27 deletions
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index 1193f7b2..aea27972 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -24,9 +24,7 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.flow.EventFlow
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
@@ -39,22 +37,23 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli
import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
-import java.util.PriorityQueue
-import java.util.Queue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
-import kotlinx.coroutines.withContext
+import java.time.Clock
+import java.util.PriorityQueue
+import java.util.Queue
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
* Topology Scheduling.
*/
class StageWorkflowService(
- private val domain: Domain,
+ internal val coroutineScope: CoroutineScope,
+ internal val clock: Clock,
private val provisioningService: ProvisioningService,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -63,7 +62,7 @@ class StageWorkflowService(
taskOrderPolicy: TaskOrderPolicy,
resourceFilterPolicy: ResourceFilterPolicy,
resourceSelectionPolicy: ResourceSelectionPolicy
-) : WorkflowService, CoroutineScope by domain {
+) : WorkflowService {
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -175,7 +174,7 @@ class StageWorkflowService(
private val eventFlow = EventFlow<WorkflowEvent>()
init {
- domain.launch {
+ coroutineScope.launch {
nodes = provisioningService.nodes().toList()
available.addAll(nodes)
}
@@ -191,9 +190,9 @@ class StageWorkflowService(
override val events: Flow<WorkflowEvent> = eventFlow
- override suspend fun submit(job: Job) = withContext(domain.coroutineContext) {
+ override suspend fun submit(job: Job) {
// J1 Incoming Jobs
- val jobInstance = JobState(job, simulationContext.clock.millis())
+ val jobInstance = JobState(job, clock.millis())
val instances = job.tasks.associateWith {
TaskState(jobInstance, it)
}
@@ -241,7 +240,7 @@ class StageWorkflowService(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis()))
+ eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, clock.millis()))
rootListener.jobStarted(jobInstance)
}
@@ -295,7 +294,7 @@ class StageWorkflowService(
taskByServer[server] = instance
server.events
.onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
- .launchIn(this)
+ .launchIn(coroutineScope)
activeTasks += instance
taskQueue.poll()
@@ -310,19 +309,19 @@ class StageWorkflowService(
when (server.state) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
- task.startedAt = simulationContext.clock.millis()
- eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
+ task.startedAt = clock.millis()
+ eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, clock.millis()))
rootListener.taskStarted(task)
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val task = taskByServer.remove(server) ?: throw IllegalStateException()
val job = task.job
task.state = TaskStatus.FINISHED
- task.finishedAt = simulationContext.clock.millis()
+ task.finishedAt = clock.millis()
job.tasks.remove(task)
available += task.host!!
activeTasks -= task
- eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
+ eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, clock.millis()))
rootListener.taskFinished(task)
// Add job roots to the scheduling queue
@@ -347,7 +346,7 @@ class StageWorkflowService(
private suspend fun finishJob(job: JobState) {
activeJobs -= job
- eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis()))
+ eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, clock.millis()))
rootListener.jobFinished(job)
}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
index 776f0b07..cb075b18 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.workflows.service.stage.StagePolicy
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -66,13 +65,12 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = simulationContext
if (next == null) {
// In batch mode, we assume that the scheduler runs at a fixed slot every time
// quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
- val delay = quantum - (ctx.clock.millis() % quantum)
+ val delay = quantum - (scheduler.clock.millis() % quantum)
- val job = ctx.domain.launch {
+ val job = scheduler.coroutineScope.launch {
delay(delay)
next = null
scheduler.schedule()
@@ -93,11 +91,10 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = simulationContext
if (next == null) {
val delay = random.nextInt(200).toLong()
- val job = ctx.domain.launch {
+ val job = scheduler.coroutineScope.launch {
delay(delay)
next = null
scheduler.schedule()
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
index a60ba0e2..ad818dde 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
@@ -26,8 +26,8 @@ package com.atlarge.opendc.workflows.service
import com.atlarge.opendc.core.services.AbstractServiceKey
import com.atlarge.opendc.workflows.workload.Job
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A service for cloud workflow management.
diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 5c129e37..655d8e1d 100644
--- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -35,8 +35,6 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import java.util.ServiceLoader
-import kotlin.math.max
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
@@ -46,6 +44,8 @@ import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
+import java.util.ServiceLoader
+import kotlin.math.max
/**
* Integration test suite for the [StageWorkflowService].
@@ -68,11 +68,13 @@ internal class StageWorkflowSchedulerIntegrationTest {
val schedulerDomain = system.newDomain(name = "scheduler")
val schedulerAsync = schedulerDomain.async {
+ val clock = simulationContext.clock
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
- .use { it.construct(system.newDomain("topology")) }
+ .use { it.construct(schedulerDomain) }
StageWorkflowService(
schedulerDomain,
+ clock,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,