summaryrefslogtreecommitdiff
path: root/opendc/opendc-workflows/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'opendc/opendc-workflows/src/test')
-rw-r--r--opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt40
1 files changed, 18 insertions, 22 deletions
diff --git a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 19e56482..5ee6d5e6 100644
--- a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -29,17 +29,16 @@ import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import com.atlarge.opendc.workflows.workload.Job
-import com.atlarge.opendc.workflows.workload.Task
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Assertions.assertEquals
@@ -64,24 +63,6 @@ internal class StageWorkflowSchedulerIntegrationTest {
var tasksStarted = 0L
var tasksFinished = 0L
- val monitor = object : WorkflowMonitor {
- override suspend fun onJobStart(job: Job, time: Long) {
- jobsStarted++
- }
-
- override suspend fun onJobFinish(job: Job, time: Long) {
- jobsFinished++
- }
-
- override suspend fun onTaskStart(job: Job, task: Task, time: Long) {
- tasksStarted++
- }
-
- override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) {
- tasksFinished++
- }
- }
-
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
val system = provider(name = "sim")
@@ -104,6 +85,21 @@ internal class StageWorkflowSchedulerIntegrationTest {
}
val broker = system.newDomain(name = "broker")
+
+ broker.launch {
+ val scheduler = schedulerAsync.await()
+ scheduler.events
+ .onEach { event ->
+ when (event) {
+ is WorkflowEvent.JobStarted -> jobsStarted++
+ is WorkflowEvent.JobFinished -> jobsFinished++
+ is WorkflowEvent.TaskStarted -> tasksStarted++
+ is WorkflowEvent.TaskFinished -> tasksFinished++
+ }
+ }
+ .collect()
+ }
+
broker.launch {
val ctx = simulationContext
val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf"))
@@ -113,7 +109,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
val (time, job) = reader.next()
jobsSubmitted++
delay(max(0, time * 1000 - ctx.clock.millis()))
- scheduler.submit(job, monitor)
+ scheduler.submit(job)
}
}