diff options
Diffstat (limited to 'opendc/opendc-workflows/src/main')
| -rw-r--r-- | opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt | 3 | ||||
| -rw-r--r-- | opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt | 96 | ||||
| -rw-r--r-- | opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt) | 45 | ||||
| -rw-r--r-- | opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt | 9 |
4 files changed, 95 insertions, 58 deletions
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt index b444f91c..1cb2de97 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt @@ -24,10 +24,9 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.workload.Job -class JobState(val job: Job, val monitor: WorkflowMonitor, val submittedAt: Long) { +class JobState(val job: Job, val submittedAt: Long) { /** * A flag to indicate whether this job is finished. */ diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index a055a3fe..7a20363c 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -25,13 +25,13 @@ package com.atlarge.opendc.workflows.service import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.JobOrderPolicy import com.atlarge.opendc.workflows.service.stage.resource.ResourceFilterPolicy @@ -39,6 +39,11 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import java.util.PriorityQueue import java.util.Queue import kotlinx.coroutines.launch @@ -58,7 +63,7 @@ class StageWorkflowService( taskOrderPolicy: TaskOrderPolicy, resourceFilterPolicy: ResourceFilterPolicy, resourceSelectionPolicy: ResourceSelectionPolicy -) : WorkflowService, ServerMonitor { +) : WorkflowService, CoroutineScope by domain { /** * The incoming jobs ready to be processed by the scheduler. @@ -167,6 +172,7 @@ class StageWorkflowService( private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic private val resourceFilterPolicy: ResourceFilterPolicy.Logic private val resourceSelectionPolicy: Comparator<Node> + private val eventFlow = EventFlow<WorkflowEvent>() init { domain.launch { @@ -183,9 +189,11 @@ class StageWorkflowService( this.resourceSelectionPolicy = resourceSelectionPolicy(this) } - override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) { + override val events: Flow<WorkflowEvent> = eventFlow + + override suspend fun submit(job: Job) = withContext(domain.coroutineContext) { // J1 Incoming Jobs - val jobInstance = JobState(job, monitor, simulationContext.clock.millis()) + val jobInstance = JobState(job, simulationContext.clock.millis()) val instances = job.tasks.associateWith { TaskState(jobInstance, it) } @@ -217,6 +225,7 @@ class StageWorkflowService( /** * Perform a scheduling cycle immediately. */ + @OptIn(ExperimentalCoroutinesApi::class) internal suspend fun schedule() { // J2 Create list of eligible jobs val iterator = incomingJobs.iterator() @@ -232,7 +241,7 @@ class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis()) + eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis())) rootListener.jobStarted(jobInstance) } @@ -280,10 +289,13 @@ class StageWorkflowService( // T4 Submit task to machine available -= host instance.state = TaskStatus.ACTIVE - - val newHost = provisioningService.deploy(host, instance.task.image, this) + val newHost = provisioningService.deploy(host, instance.task.image) + val server = newHost.server!! instance.host = newHost - taskByServer[newHost.server!!] = instance + taskByServer[server] = instance + server.events + .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } + .launchIn(this) activeTasks += instance taskQueue.poll() @@ -294,50 +306,48 @@ class StageWorkflowService( } } - override fun stateChanged(server: Server, previousState: ServerState) { - domain.launch { - when (server.state) { - ServerState.ACTIVE -> { - val task = taskByServer.getValue(server) - task.startedAt = simulationContext.clock.millis() - task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis()) - rootListener.taskStarted(task) - } - ServerState.SHUTOFF, ServerState.ERROR -> { - val task = taskByServer.remove(server) ?: throw IllegalStateException() - val job = task.job - task.state = TaskStatus.FINISHED - task.finishedAt = simulationContext.clock.millis() - job.tasks.remove(task) - available += task.host!! - activeTasks -= task - job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis()) - rootListener.taskFinished(task) - - // Add job roots to the scheduling queue - for (dependent in task.dependents) { - if (dependent.state != TaskStatus.READY) { - continue - } - - incomingTasks += dependent - rootListener.taskReady(dependent) + private suspend fun stateChanged(server: Server) { + when (server.state) { + ServerState.ACTIVE -> { + val task = taskByServer.getValue(server) + task.startedAt = simulationContext.clock.millis() + eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + rootListener.taskStarted(task) + } + ServerState.SHUTOFF, ServerState.ERROR -> { + val task = taskByServer.remove(server) ?: throw IllegalStateException() + val job = task.job + task.state = TaskStatus.FINISHED + task.finishedAt = simulationContext.clock.millis() + job.tasks.remove(task) + available += task.host!! + activeTasks -= task + eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + rootListener.taskFinished(task) + + // Add job roots to the scheduling queue + for (dependent in task.dependents) { + if (dependent.state != TaskStatus.READY) { + continue } - if (job.isFinished) { - finishJob(job) - } + incomingTasks += dependent + rootListener.taskReady(dependent) + } - requestCycle() + if (job.isFinished) { + finishJob(job) } - else -> throw IllegalStateException() + + requestCycle() } + else -> throw IllegalStateException() } } private suspend fun finishJob(job: JobState) { activeJobs -= job - job.monitor.onJobFinish(job.job, simulationContext.clock.millis()) + eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis())) rootListener.jobFinished(job) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt index 3c77d57a..2ca5a19d 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt @@ -22,32 +22,55 @@ * SOFTWARE. */ -package com.atlarge.opendc.workflows.monitor +package com.atlarge.opendc.workflows.service import com.atlarge.opendc.workflows.workload.Job import com.atlarge.opendc.workflows.workload.Task /** - * An interface for monitoring the progression of workflows. + * An event emitted by the [WorkflowService]. */ -public interface WorkflowMonitor { +public sealed class WorkflowEvent { /** - * This method is invoked when a job has become active. + * The [WorkflowService] that emitted the event. */ - public suspend fun onJobStart(job: Job, time: Long) + public abstract val service: WorkflowService /** - * This method is invoked when a job has finished processing. + * This event is emitted when a job has become active. */ - public suspend fun onJobFinish(job: Job, time: Long) + public data class JobStarted( + override val service: WorkflowService, + public val job: Job, + public val time: Long + ) : WorkflowEvent() /** - * This method is invoked when a task of a job has started processing. + * This event is emitted when a job has finished processing. */ - public suspend fun onTaskStart(job: Job, task: Task, time: Long) + public data class JobFinished( + override val service: WorkflowService, + public val job: Job, + public val time: Long + ) : WorkflowEvent() /** - * This method is invoked when a task has finished processing. + * This event is emitted when a task of a job has started processing. */ - public suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) + public data class TaskStarted( + override val service: WorkflowService, + public val job: Job, + public val task: Task, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a task of a job has started processing. + */ + public data class TaskFinished( + override val service: WorkflowService, + public val job: Job, + public val task: Task, + public val time: Long + ) : WorkflowEvent() } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt index 524f4f9e..38ea49c4 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt @@ -25,8 +25,8 @@ package com.atlarge.opendc.workflows.service import com.atlarge.opendc.core.services.AbstractServiceKey -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.workload.Job +import kotlinx.coroutines.flow.Flow import java.util.UUID /** @@ -36,9 +36,14 @@ import java.util.UUID */ public interface WorkflowService { /** + * Thie events emitted by the workflow scheduler. + */ + public val events: Flow<WorkflowEvent> + + /** * Submit the specified [Job] to the workflow service for scheduling. */ - public suspend fun submit(job: Job, monitor: WorkflowMonitor) + public suspend fun submit(job: Job) /** * The service key for the workflow scheduler. |
