summaryrefslogtreecommitdiff
path: root/simulator/opendc/opendc-workflows
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc/opendc-workflows')
-rw-r--r--simulator/opendc/opendc-workflows/build.gradle.kts2
-rw-r--r--simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt39
2 files changed, 16 insertions, 25 deletions
diff --git a/simulator/opendc/opendc-workflows/build.gradle.kts b/simulator/opendc/opendc-workflows/build.gradle.kts
index 893c9020..62c4bc25 100644
--- a/simulator/opendc/opendc-workflows/build.gradle.kts
+++ b/simulator/opendc/opendc-workflows/build.gradle.kts
@@ -33,7 +33,7 @@ dependencies {
api(project(":opendc:opendc-core"))
api(project(":opendc:opendc-compute"))
- testRuntimeOnly(project(":odcsim:odcsim-engine-omega"))
+ testImplementation(project(":opendc:opendc-simulator"))
testImplementation(project(":opendc:opendc-format"))
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 655d8e1d..114003a3 100644
--- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -24,8 +24,6 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
@@ -35,22 +33,22 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import kotlinx.coroutines.async
-import kotlinx.coroutines.delay
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
-import java.util.ServiceLoader
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import kotlin.math.max
/**
* Integration test suite for the [StageWorkflowService].
*/
@DisplayName("StageWorkflowService")
+@OptIn(ExperimentalCoroutinesApi::class)
internal class StageWorkflowSchedulerIntegrationTest {
/**
* A large integration test where we check whether all tasks in some trace are executed correctly.
@@ -63,17 +61,15 @@ internal class StageWorkflowSchedulerIntegrationTest {
var tasksStarted = 0L
var tasksFinished = 0L
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider(name = "sim")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
- val schedulerDomain = system.newDomain(name = "scheduler")
- val schedulerAsync = schedulerDomain.async {
- val clock = simulationContext.clock
+ val schedulerAsync = testScope.async {
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
- .use { it.construct(schedulerDomain) }
+ .use { it.construct(testScope, clock) }
StageWorkflowService(
- schedulerDomain,
+ testScope,
clock,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
@@ -86,9 +82,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
)
}
- val broker = system.newDomain(name = "broker")
-
- broker.launch {
+ testScope.launch {
val scheduler = schedulerAsync.await()
scheduler.events
.onEach { event ->
@@ -102,24 +96,21 @@ internal class StageWorkflowSchedulerIntegrationTest {
.collect()
}
- broker.launch {
- val ctx = simulationContext
+ testScope.launch {
val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf"))
val scheduler = schedulerAsync.await()
while (reader.hasNext()) {
val (time, job) = reader.next()
jobsSubmitted++
- delay(max(0, time * 1000 - ctx.clock.millis()))
+ delay(max(0, time * 1000 - clock.millis()))
scheduler.submit(job)
}
}
- runBlocking {
- system.run()
- system.terminate()
- }
+ testScope.advanceUntilIdle()
+ assertNotEquals(0, jobsSubmitted, "No jobs submitted")
assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started")
assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished")
assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished")