diff options
Diffstat (limited to 'simulator/opendc/opendc-workflows/src')
4 files changed, 25 insertions, 27 deletions
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index 1193f7b2..aea27972 100644 --- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -24,9 +24,7 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.Domain import com.atlarge.odcsim.flow.EventFlow -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState @@ -39,22 +37,23 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job -import java.util.PriorityQueue -import java.util.Queue import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext +import java.time.Clock +import java.util.PriorityQueue +import java.util.Queue /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for * Topology Scheduling. */ class StageWorkflowService( - private val domain: Domain, + internal val coroutineScope: CoroutineScope, + internal val clock: Clock, private val provisioningService: ProvisioningService, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -63,7 +62,7 @@ class StageWorkflowService( taskOrderPolicy: TaskOrderPolicy, resourceFilterPolicy: ResourceFilterPolicy, resourceSelectionPolicy: ResourceSelectionPolicy -) : WorkflowService, CoroutineScope by domain { +) : WorkflowService { /** * The incoming jobs ready to be processed by the scheduler. @@ -175,7 +174,7 @@ class StageWorkflowService( private val eventFlow = EventFlow<WorkflowEvent>() init { - domain.launch { + coroutineScope.launch { nodes = provisioningService.nodes().toList() available.addAll(nodes) } @@ -191,9 +190,9 @@ class StageWorkflowService( override val events: Flow<WorkflowEvent> = eventFlow - override suspend fun submit(job: Job) = withContext(domain.coroutineContext) { + override suspend fun submit(job: Job) { // J1 Incoming Jobs - val jobInstance = JobState(job, simulationContext.clock.millis()) + val jobInstance = JobState(job, clock.millis()) val instances = job.tasks.associateWith { TaskState(jobInstance, it) } @@ -241,7 +240,7 @@ class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis())) + eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, clock.millis())) rootListener.jobStarted(jobInstance) } @@ -295,7 +294,7 @@ class StageWorkflowService( taskByServer[server] = instance server.events .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } - .launchIn(this) + .launchIn(coroutineScope) activeTasks += instance taskQueue.poll() @@ -310,19 +309,19 @@ class StageWorkflowService( when (server.state) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) - task.startedAt = simulationContext.clock.millis() - eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + task.startedAt = clock.millis() + eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, clock.millis())) rootListener.taskStarted(task) } ServerState.SHUTOFF, ServerState.ERROR -> { val task = taskByServer.remove(server) ?: throw IllegalStateException() val job = task.job task.state = TaskStatus.FINISHED - task.finishedAt = simulationContext.clock.millis() + task.finishedAt = clock.millis() job.tasks.remove(task) available += task.host!! activeTasks -= task - eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, clock.millis())) rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -347,7 +346,7 @@ class StageWorkflowService( private suspend fun finishJob(job: JobState) { activeJobs -= job - eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis())) + eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, clock.millis())) rootListener.jobFinished(job) } diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt index 776f0b07..cb075b18 100644 --- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.workflows.service.stage.StagePolicy import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -66,13 +65,12 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { override suspend fun requestCycle() { - val ctx = simulationContext if (next == null) { // In batch mode, we assume that the scheduler runs at a fixed slot every time // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. - val delay = quantum - (ctx.clock.millis() % quantum) + val delay = quantum - (scheduler.clock.millis() % quantum) - val job = ctx.domain.launch { + val job = scheduler.coroutineScope.launch { delay(delay) next = null scheduler.schedule() @@ -93,11 +91,10 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { override suspend fun requestCycle() { - val ctx = simulationContext if (next == null) { val delay = random.nextInt(200).toLong() - val job = ctx.domain.launch { + val job = scheduler.coroutineScope.launch { delay(delay) next = null scheduler.schedule() diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt index a60ba0e2..ad818dde 100644 --- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt @@ -26,8 +26,8 @@ package com.atlarge.opendc.workflows.service import com.atlarge.opendc.core.services.AbstractServiceKey import com.atlarge.opendc.workflows.workload.Job -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A service for cloud workflow management. diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 5c129e37..655d8e1d 100644 --- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -35,8 +35,6 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import java.util.ServiceLoader -import kotlin.math.max import kotlinx.coroutines.async import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect @@ -46,6 +44,8 @@ import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test +import java.util.ServiceLoader +import kotlin.math.max /** * Integration test suite for the [StageWorkflowService]. @@ -68,11 +68,13 @@ internal class StageWorkflowSchedulerIntegrationTest { val schedulerDomain = system.newDomain(name = "scheduler") val schedulerAsync = schedulerDomain.async { + val clock = simulationContext.clock val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.construct(system.newDomain("topology")) } + .use { it.construct(schedulerDomain) } StageWorkflowService( schedulerDomain, + clock, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, |
