diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-21 22:04:31 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-25 10:51:27 +0100 |
| commit | 76bfeb44c5a02be143c152c52bc1029cff360744 (patch) | |
| tree | be467a0be698df2ebb4dd9fd3c5410d1e53ffa46 /opendc/opendc-workflows/src | |
| parent | bc64182612ad06f15bff5b48637ed7d241e293b2 (diff) | |
refactor: Migrate to Flow for event listeners
Diffstat (limited to 'opendc/opendc-workflows/src')
| -rw-r--r-- | opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt | 3 | ||||
| -rw-r--r-- | opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt | 96 | ||||
| -rw-r--r-- | opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt) | 45 | ||||
| -rw-r--r-- | opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt | 9 | ||||
| -rw-r--r-- | opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt | 40 |
5 files changed, 113 insertions, 80 deletions
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt index b444f91c..1cb2de97 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt @@ -24,10 +24,9 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.workload.Job -class JobState(val job: Job, val monitor: WorkflowMonitor, val submittedAt: Long) { +class JobState(val job: Job, val submittedAt: Long) { /** * A flag to indicate whether this job is finished. */ diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index a055a3fe..7a20363c 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -25,13 +25,13 @@ package com.atlarge.opendc.workflows.service import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.JobOrderPolicy import com.atlarge.opendc.workflows.service.stage.resource.ResourceFilterPolicy @@ -39,6 +39,11 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import java.util.PriorityQueue import java.util.Queue import kotlinx.coroutines.launch @@ -58,7 +63,7 @@ class StageWorkflowService( taskOrderPolicy: TaskOrderPolicy, resourceFilterPolicy: ResourceFilterPolicy, resourceSelectionPolicy: ResourceSelectionPolicy -) : WorkflowService, ServerMonitor { +) : WorkflowService, CoroutineScope by domain { /** * The incoming jobs ready to be processed by the scheduler. @@ -167,6 +172,7 @@ class StageWorkflowService( private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic private val resourceFilterPolicy: ResourceFilterPolicy.Logic private val resourceSelectionPolicy: Comparator<Node> + private val eventFlow = EventFlow<WorkflowEvent>() init { domain.launch { @@ -183,9 +189,11 @@ class StageWorkflowService( this.resourceSelectionPolicy = resourceSelectionPolicy(this) } - override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) { + override val events: Flow<WorkflowEvent> = eventFlow + + override suspend fun submit(job: Job) = withContext(domain.coroutineContext) { // J1 Incoming Jobs - val jobInstance = JobState(job, monitor, simulationContext.clock.millis()) + val jobInstance = JobState(job, simulationContext.clock.millis()) val instances = job.tasks.associateWith { TaskState(jobInstance, it) } @@ -217,6 +225,7 @@ class StageWorkflowService( /** * Perform a scheduling cycle immediately. */ + @OptIn(ExperimentalCoroutinesApi::class) internal suspend fun schedule() { // J2 Create list of eligible jobs val iterator = incomingJobs.iterator() @@ -232,7 +241,7 @@ class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis()) + eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis())) rootListener.jobStarted(jobInstance) } @@ -280,10 +289,13 @@ class StageWorkflowService( // T4 Submit task to machine available -= host instance.state = TaskStatus.ACTIVE - - val newHost = provisioningService.deploy(host, instance.task.image, this) + val newHost = provisioningService.deploy(host, instance.task.image) + val server = newHost.server!! instance.host = newHost - taskByServer[newHost.server!!] = instance + taskByServer[server] = instance + server.events + .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } + .launchIn(this) activeTasks += instance taskQueue.poll() @@ -294,50 +306,48 @@ class StageWorkflowService( } } - override fun stateChanged(server: Server, previousState: ServerState) { - domain.launch { - when (server.state) { - ServerState.ACTIVE -> { - val task = taskByServer.getValue(server) - task.startedAt = simulationContext.clock.millis() - task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis()) - rootListener.taskStarted(task) - } - ServerState.SHUTOFF, ServerState.ERROR -> { - val task = taskByServer.remove(server) ?: throw IllegalStateException() - val job = task.job - task.state = TaskStatus.FINISHED - task.finishedAt = simulationContext.clock.millis() - job.tasks.remove(task) - available += task.host!! - activeTasks -= task - job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis()) - rootListener.taskFinished(task) - - // Add job roots to the scheduling queue - for (dependent in task.dependents) { - if (dependent.state != TaskStatus.READY) { - continue - } - - incomingTasks += dependent - rootListener.taskReady(dependent) + private suspend fun stateChanged(server: Server) { + when (server.state) { + ServerState.ACTIVE -> { + val task = taskByServer.getValue(server) + task.startedAt = simulationContext.clock.millis() + eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + rootListener.taskStarted(task) + } + ServerState.SHUTOFF, ServerState.ERROR -> { + val task = taskByServer.remove(server) ?: throw IllegalStateException() + val job = task.job + task.state = TaskStatus.FINISHED + task.finishedAt = simulationContext.clock.millis() + job.tasks.remove(task) + available += task.host!! + activeTasks -= task + eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + rootListener.taskFinished(task) + + // Add job roots to the scheduling queue + for (dependent in task.dependents) { + if (dependent.state != TaskStatus.READY) { + continue } - if (job.isFinished) { - finishJob(job) - } + incomingTasks += dependent + rootListener.taskReady(dependent) + } - requestCycle() + if (job.isFinished) { + finishJob(job) } - else -> throw IllegalStateException() + + requestCycle() } + else -> throw IllegalStateException() } } private suspend fun finishJob(job: JobState) { activeJobs -= job - job.monitor.onJobFinish(job.job, simulationContext.clock.millis()) + eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis())) rootListener.jobFinished(job) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt index 3c77d57a..2ca5a19d 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt @@ -22,32 +22,55 @@ * SOFTWARE. */ -package com.atlarge.opendc.workflows.monitor +package com.atlarge.opendc.workflows.service import com.atlarge.opendc.workflows.workload.Job import com.atlarge.opendc.workflows.workload.Task /** - * An interface for monitoring the progression of workflows. + * An event emitted by the [WorkflowService]. */ -public interface WorkflowMonitor { +public sealed class WorkflowEvent { /** - * This method is invoked when a job has become active. + * The [WorkflowService] that emitted the event. */ - public suspend fun onJobStart(job: Job, time: Long) + public abstract val service: WorkflowService /** - * This method is invoked when a job has finished processing. + * This event is emitted when a job has become active. */ - public suspend fun onJobFinish(job: Job, time: Long) + public data class JobStarted( + override val service: WorkflowService, + public val job: Job, + public val time: Long + ) : WorkflowEvent() /** - * This method is invoked when a task of a job has started processing. + * This event is emitted when a job has finished processing. */ - public suspend fun onTaskStart(job: Job, task: Task, time: Long) + public data class JobFinished( + override val service: WorkflowService, + public val job: Job, + public val time: Long + ) : WorkflowEvent() /** - * This method is invoked when a task has finished processing. + * This event is emitted when a task of a job has started processing. */ - public suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) + public data class TaskStarted( + override val service: WorkflowService, + public val job: Job, + public val task: Task, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a task of a job has started processing. + */ + public data class TaskFinished( + override val service: WorkflowService, + public val job: Job, + public val task: Task, + public val time: Long + ) : WorkflowEvent() } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt index 524f4f9e..38ea49c4 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt @@ -25,8 +25,8 @@ package com.atlarge.opendc.workflows.service import com.atlarge.opendc.core.services.AbstractServiceKey -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.workload.Job +import kotlinx.coroutines.flow.Flow import java.util.UUID /** @@ -36,9 +36,14 @@ import java.util.UUID */ public interface WorkflowService { /** + * Thie events emitted by the workflow scheduler. + */ + public val events: Flow<WorkflowEvent> + + /** * Submit the specified [Job] to the workflow service for scheduling. */ - public suspend fun submit(job: Job, monitor: WorkflowMonitor) + public suspend fun submit(job: Job) /** * The service key for the workflow scheduler. diff --git a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 19e56482..5ee6d5e6 100644 --- a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -29,17 +29,16 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task import kotlinx.coroutines.async import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertEquals @@ -64,24 +63,6 @@ internal class StageWorkflowSchedulerIntegrationTest { var tasksStarted = 0L var tasksFinished = 0L - val monitor = object : WorkflowMonitor { - override suspend fun onJobStart(job: Job, time: Long) { - jobsStarted++ - } - - override suspend fun onJobFinish(job: Job, time: Long) { - jobsFinished++ - } - - override suspend fun onTaskStart(job: Job, task: Task, time: Long) { - tasksStarted++ - } - - override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) { - tasksFinished++ - } - } - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider(name = "sim") @@ -104,6 +85,21 @@ internal class StageWorkflowSchedulerIntegrationTest { } val broker = system.newDomain(name = "broker") + + broker.launch { + val scheduler = schedulerAsync.await() + scheduler.events + .onEach { event -> + when (event) { + is WorkflowEvent.JobStarted -> jobsStarted++ + is WorkflowEvent.JobFinished -> jobsFinished++ + is WorkflowEvent.TaskStarted -> tasksStarted++ + is WorkflowEvent.TaskFinished -> tasksFinished++ + } + } + .collect() + } + broker.launch { val ctx = simulationContext val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) @@ -113,7 +109,7 @@ internal class StageWorkflowSchedulerIntegrationTest { val (time, job) = reader.next() jobsSubmitted++ delay(max(0, time * 1000 - ctx.clock.millis())) - scheduler.submit(job, monitor) + scheduler.submit(job) } } |
