summaryrefslogtreecommitdiff
path: root/simulator/opendc/opendc-workflows/src
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc/opendc-workflows/src')
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt33
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt9
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt2
-rw-r--r--simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt8
4 files changed, 25 insertions, 27 deletions
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index 1193f7b2..aea27972 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -24,9 +24,7 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.flow.EventFlow
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
@@ -39,22 +37,23 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli
import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
-import java.util.PriorityQueue
-import java.util.Queue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
-import kotlinx.coroutines.withContext
+import java.time.Clock
+import java.util.PriorityQueue
+import java.util.Queue
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
* Topology Scheduling.
*/
class StageWorkflowService(
- private val domain: Domain,
+ internal val coroutineScope: CoroutineScope,
+ internal val clock: Clock,
private val provisioningService: ProvisioningService,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -63,7 +62,7 @@ class StageWorkflowService(
taskOrderPolicy: TaskOrderPolicy,
resourceFilterPolicy: ResourceFilterPolicy,
resourceSelectionPolicy: ResourceSelectionPolicy
-) : WorkflowService, CoroutineScope by domain {
+) : WorkflowService {
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -175,7 +174,7 @@ class StageWorkflowService(
private val eventFlow = EventFlow<WorkflowEvent>()
init {
- domain.launch {
+ coroutineScope.launch {
nodes = provisioningService.nodes().toList()
available.addAll(nodes)
}
@@ -191,9 +190,9 @@ class StageWorkflowService(
override val events: Flow<WorkflowEvent> = eventFlow
- override suspend fun submit(job: Job) = withContext(domain.coroutineContext) {
+ override suspend fun submit(job: Job) {
// J1 Incoming Jobs
- val jobInstance = JobState(job, simulationContext.clock.millis())
+ val jobInstance = JobState(job, clock.millis())
val instances = job.tasks.associateWith {
TaskState(jobInstance, it)
}
@@ -241,7 +240,7 @@ class StageWorkflowService(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis()))
+ eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, clock.millis()))
rootListener.jobStarted(jobInstance)
}
@@ -295,7 +294,7 @@ class StageWorkflowService(
taskByServer[server] = instance
server.events
.onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
- .launchIn(this)
+ .launchIn(coroutineScope)
activeTasks += instance
taskQueue.poll()
@@ -310,19 +309,19 @@ class StageWorkflowService(
when (server.state) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
- task.startedAt = simulationContext.clock.millis()
- eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
+ task.startedAt = clock.millis()
+ eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, clock.millis()))
rootListener.taskStarted(task)
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val task = taskByServer.remove(server) ?: throw IllegalStateException()
val job = task.job
task.state = TaskStatus.FINISHED
- task.finishedAt = simulationContext.clock.millis()
+ task.finishedAt = clock.millis()
job.tasks.remove(task)
available += task.host!!
activeTasks -= task
- eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
+ eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, clock.millis()))
rootListener.taskFinished(task)
// Add job roots to the scheduling queue
@@ -347,7 +346,7 @@ class StageWorkflowService(
private suspend fun finishJob(job: JobState) {
activeJobs -= job
- eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis()))
+ eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, clock.millis()))
rootListener.jobFinished(job)
}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
index 776f0b07..cb075b18 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.workflows.service.stage.StagePolicy
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -66,13 +65,12 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = simulationContext
if (next == null) {
// In batch mode, we assume that the scheduler runs at a fixed slot every time
// quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
- val delay = quantum - (ctx.clock.millis() % quantum)
+ val delay = quantum - (scheduler.clock.millis() % quantum)
- val job = ctx.domain.launch {
+ val job = scheduler.coroutineScope.launch {
delay(delay)
next = null
scheduler.schedule()
@@ -93,11 +91,10 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = simulationContext
if (next == null) {
val delay = random.nextInt(200).toLong()
- val job = ctx.domain.launch {
+ val job = scheduler.coroutineScope.launch {
delay(delay)
next = null
scheduler.schedule()
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
index a60ba0e2..ad818dde 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
@@ -26,8 +26,8 @@ package com.atlarge.opendc.workflows.service
import com.atlarge.opendc.core.services.AbstractServiceKey
import com.atlarge.opendc.workflows.workload.Job
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A service for cloud workflow management.
diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 5c129e37..655d8e1d 100644
--- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -35,8 +35,6 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import java.util.ServiceLoader
-import kotlin.math.max
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
@@ -46,6 +44,8 @@ import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
+import java.util.ServiceLoader
+import kotlin.math.max
/**
* Integration test suite for the [StageWorkflowService].
@@ -68,11 +68,13 @@ internal class StageWorkflowSchedulerIntegrationTest {
val schedulerDomain = system.newDomain(name = "scheduler")
val schedulerAsync = schedulerDomain.async {
+ val clock = simulationContext.clock
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
- .use { it.construct(system.newDomain("topology")) }
+ .use { it.construct(schedulerDomain) }
StageWorkflowService(
schedulerDomain,
+ clock,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,