summaryrefslogtreecommitdiff
path: root/opendc/opendc-workflows
diff options
context:
space:
mode:
Diffstat (limited to 'opendc/opendc-workflows')
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt3
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt96
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt)45
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt9
-rw-r--r--opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt40
5 files changed, 113 insertions, 80 deletions
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt
index b444f91c..1cb2de97 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt
@@ -24,10 +24,9 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.workload.Job
-class JobState(val job: Job, val monitor: WorkflowMonitor, val submittedAt: Long) {
+class JobState(val job: Job, val submittedAt: Long) {
/**
* A flag to indicate whether this job is finished.
*/
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index a055a3fe..7a20363c 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -25,13 +25,13 @@
package com.atlarge.opendc.workflows.service
import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.service.ProvisioningService
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy
import com.atlarge.opendc.workflows.service.stage.job.JobOrderPolicy
import com.atlarge.opendc.workflows.service.stage.resource.ResourceFilterPolicy
@@ -39,6 +39,11 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli
import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
import java.util.PriorityQueue
import java.util.Queue
import kotlinx.coroutines.launch
@@ -58,7 +63,7 @@ class StageWorkflowService(
taskOrderPolicy: TaskOrderPolicy,
resourceFilterPolicy: ResourceFilterPolicy,
resourceSelectionPolicy: ResourceSelectionPolicy
-) : WorkflowService, ServerMonitor {
+) : WorkflowService, CoroutineScope by domain {
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -167,6 +172,7 @@ class StageWorkflowService(
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
private val resourceFilterPolicy: ResourceFilterPolicy.Logic
private val resourceSelectionPolicy: Comparator<Node>
+ private val eventFlow = EventFlow<WorkflowEvent>()
init {
domain.launch {
@@ -183,9 +189,11 @@ class StageWorkflowService(
this.resourceSelectionPolicy = resourceSelectionPolicy(this)
}
- override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) {
+ override val events: Flow<WorkflowEvent> = eventFlow
+
+ override suspend fun submit(job: Job) = withContext(domain.coroutineContext) {
// J1 Incoming Jobs
- val jobInstance = JobState(job, monitor, simulationContext.clock.millis())
+ val jobInstance = JobState(job, simulationContext.clock.millis())
val instances = job.tasks.associateWith {
TaskState(jobInstance, it)
}
@@ -217,6 +225,7 @@ class StageWorkflowService(
/**
* Perform a scheduling cycle immediately.
*/
+ @OptIn(ExperimentalCoroutinesApi::class)
internal suspend fun schedule() {
// J2 Create list of eligible jobs
val iterator = incomingJobs.iterator()
@@ -232,7 +241,7 @@ class StageWorkflowService(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis())
+ eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis()))
rootListener.jobStarted(jobInstance)
}
@@ -280,10 +289,13 @@ class StageWorkflowService(
// T4 Submit task to machine
available -= host
instance.state = TaskStatus.ACTIVE
-
- val newHost = provisioningService.deploy(host, instance.task.image, this)
+ val newHost = provisioningService.deploy(host, instance.task.image)
+ val server = newHost.server!!
instance.host = newHost
- taskByServer[newHost.server!!] = instance
+ taskByServer[server] = instance
+ server.events
+ .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
+ .launchIn(this)
activeTasks += instance
taskQueue.poll()
@@ -294,50 +306,48 @@ class StageWorkflowService(
}
}
- override fun stateChanged(server: Server, previousState: ServerState) {
- domain.launch {
- when (server.state) {
- ServerState.ACTIVE -> {
- val task = taskByServer.getValue(server)
- task.startedAt = simulationContext.clock.millis()
- task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis())
- rootListener.taskStarted(task)
- }
- ServerState.SHUTOFF, ServerState.ERROR -> {
- val task = taskByServer.remove(server) ?: throw IllegalStateException()
- val job = task.job
- task.state = TaskStatus.FINISHED
- task.finishedAt = simulationContext.clock.millis()
- job.tasks.remove(task)
- available += task.host!!
- activeTasks -= task
- job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis())
- rootListener.taskFinished(task)
-
- // Add job roots to the scheduling queue
- for (dependent in task.dependents) {
- if (dependent.state != TaskStatus.READY) {
- continue
- }
-
- incomingTasks += dependent
- rootListener.taskReady(dependent)
+ private suspend fun stateChanged(server: Server) {
+ when (server.state) {
+ ServerState.ACTIVE -> {
+ val task = taskByServer.getValue(server)
+ task.startedAt = simulationContext.clock.millis()
+ eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
+ rootListener.taskStarted(task)
+ }
+ ServerState.SHUTOFF, ServerState.ERROR -> {
+ val task = taskByServer.remove(server) ?: throw IllegalStateException()
+ val job = task.job
+ task.state = TaskStatus.FINISHED
+ task.finishedAt = simulationContext.clock.millis()
+ job.tasks.remove(task)
+ available += task.host!!
+ activeTasks -= task
+ eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
+ rootListener.taskFinished(task)
+
+ // Add job roots to the scheduling queue
+ for (dependent in task.dependents) {
+ if (dependent.state != TaskStatus.READY) {
+ continue
}
- if (job.isFinished) {
- finishJob(job)
- }
+ incomingTasks += dependent
+ rootListener.taskReady(dependent)
+ }
- requestCycle()
+ if (job.isFinished) {
+ finishJob(job)
}
- else -> throw IllegalStateException()
+
+ requestCycle()
}
+ else -> throw IllegalStateException()
}
}
private suspend fun finishJob(job: JobState) {
activeJobs -= job
- job.monitor.onJobFinish(job.job, simulationContext.clock.millis())
+ eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis()))
rootListener.jobFinished(job)
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt
index 3c77d57a..2ca5a19d 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt
@@ -22,32 +22,55 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.workflows.monitor
+package com.atlarge.opendc.workflows.service
import com.atlarge.opendc.workflows.workload.Job
import com.atlarge.opendc.workflows.workload.Task
/**
- * An interface for monitoring the progression of workflows.
+ * An event emitted by the [WorkflowService].
*/
-public interface WorkflowMonitor {
+public sealed class WorkflowEvent {
/**
- * This method is invoked when a job has become active.
+ * The [WorkflowService] that emitted the event.
*/
- public suspend fun onJobStart(job: Job, time: Long)
+ public abstract val service: WorkflowService
/**
- * This method is invoked when a job has finished processing.
+ * This event is emitted when a job has become active.
*/
- public suspend fun onJobFinish(job: Job, time: Long)
+ public data class JobStarted(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val time: Long
+ ) : WorkflowEvent()
/**
- * This method is invoked when a task of a job has started processing.
+ * This event is emitted when a job has finished processing.
*/
- public suspend fun onTaskStart(job: Job, task: Task, time: Long)
+ public data class JobFinished(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val time: Long
+ ) : WorkflowEvent()
/**
- * This method is invoked when a task has finished processing.
+ * This event is emitted when a task of a job has started processing.
*/
- public suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long)
+ public data class TaskStarted(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val task: Task,
+ public val time: Long
+ ) : WorkflowEvent()
+
+ /**
+ * This event is emitted when a task of a job has started processing.
+ */
+ public data class TaskFinished(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val task: Task,
+ public val time: Long
+ ) : WorkflowEvent()
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
index 524f4f9e..38ea49c4 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
@@ -25,8 +25,8 @@
package com.atlarge.opendc.workflows.service
import com.atlarge.opendc.core.services.AbstractServiceKey
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.workload.Job
+import kotlinx.coroutines.flow.Flow
import java.util.UUID
/**
@@ -36,9 +36,14 @@ import java.util.UUID
*/
public interface WorkflowService {
/**
+ * Thie events emitted by the workflow scheduler.
+ */
+ public val events: Flow<WorkflowEvent>
+
+ /**
* Submit the specified [Job] to the workflow service for scheduling.
*/
- public suspend fun submit(job: Job, monitor: WorkflowMonitor)
+ public suspend fun submit(job: Job)
/**
* The service key for the workflow scheduler.
diff --git a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 19e56482..5ee6d5e6 100644
--- a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -29,17 +29,16 @@ import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import com.atlarge.opendc.workflows.workload.Job
-import com.atlarge.opendc.workflows.workload.Task
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Assertions.assertEquals
@@ -64,24 +63,6 @@ internal class StageWorkflowSchedulerIntegrationTest {
var tasksStarted = 0L
var tasksFinished = 0L
- val monitor = object : WorkflowMonitor {
- override suspend fun onJobStart(job: Job, time: Long) {
- jobsStarted++
- }
-
- override suspend fun onJobFinish(job: Job, time: Long) {
- jobsFinished++
- }
-
- override suspend fun onTaskStart(job: Job, task: Task, time: Long) {
- tasksStarted++
- }
-
- override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) {
- tasksFinished++
- }
- }
-
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
val system = provider(name = "sim")
@@ -104,6 +85,21 @@ internal class StageWorkflowSchedulerIntegrationTest {
}
val broker = system.newDomain(name = "broker")
+
+ broker.launch {
+ val scheduler = schedulerAsync.await()
+ scheduler.events
+ .onEach { event ->
+ when (event) {
+ is WorkflowEvent.JobStarted -> jobsStarted++
+ is WorkflowEvent.JobFinished -> jobsFinished++
+ is WorkflowEvent.TaskStarted -> tasksStarted++
+ is WorkflowEvent.TaskFinished -> tasksFinished++
+ }
+ }
+ .collect()
+ }
+
broker.launch {
val ctx = simulationContext
val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf"))
@@ -113,7 +109,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
val (time, job) = reader.next()
jobsSubmitted++
delay(max(0, time * 1000 - ctx.clock.millis()))
- scheduler.submit(job, monitor)
+ scheduler.submit(job)
}
}