summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-sc18
diff options
context:
space:
mode:
Diffstat (limited to 'opendc/opendc-experiments-sc18')
-rw-r--r--opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt51
1 files changed, 25 insertions, 26 deletions
diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index d5e1404a..b0182ab3 100644
--- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -29,8 +29,8 @@ import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.WorkflowEvent
import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode
import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
@@ -38,12 +38,12 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import com.atlarge.opendc.workflows.workload.Job
-import com.atlarge.opendc.workflows.workload.Task
import kotlin.math.max
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.io.File
@@ -62,28 +62,6 @@ fun main(args: Array<String>) {
var finished = 0
val token = Channel<Boolean>()
-
- val monitor = object : WorkflowMonitor {
- override suspend fun onJobStart(job: Job, time: Long) {
- println("Job ${job.uid} started")
- }
-
- override suspend fun onJobFinish(job: Job, time: Long) {
- finished += 1
- println("Jobs $finished/$total finished (${job.tasks.size} tasks)")
-
- if (finished == total) {
- token.send(true)
- }
- }
-
- override suspend fun onTaskStart(job: Job, task: Task, time: Long) {
- }
-
- override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) {
- }
- }
-
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
val system = provider(name = "sim")
@@ -106,6 +84,27 @@ fun main(args: Array<String>) {
}
val broker = system.newDomain(name = "broker")
+
+ broker.launch {
+ val scheduler = schedulerAsync.await()
+ scheduler.events
+ .onEach { event ->
+ when (event) {
+ is WorkflowEvent.JobStarted -> {
+ println("Job ${event.job.uid} started")
+ }
+ is WorkflowEvent.JobFinished -> {
+ finished += 1
+ println("Jobs $finished/$total finished (${event.job.tasks.size} tasks)")
+
+ if (finished == total) {
+ token.send(true)
+ }
+ }
+ }
+ }
+ .collect()
+ }
broker.launch {
val ctx = simulationContext
val reader = GwfTraceReader(File(args[0]))
@@ -115,7 +114,7 @@ fun main(args: Array<String>) {
val (time, job) = reader.next()
total += 1
delay(max(0, time * 1000 - ctx.clock.millis()))
- scheduler.submit(job, monitor)
+ scheduler.submit(job)
}
}