summaryrefslogtreecommitdiff
path: root/simulator/opendc/opendc-workflows/src/test
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-30 23:56:07 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-30 23:56:07 +0200
commitfcae560208df4860bc7461f955bf3b522b0e61c5 (patch)
tree933f47f1061274a6a7e648da82c13f08fce41ea5 /simulator/opendc/opendc-workflows/src/test
parent1766888d6dde44f96508a4bc6878978ddcaa073d (diff)
Migrate from Domain to TestCoroutineScope
This change eliminates the use of Domain and simulationContext in favour of the generic (Test)CoroutineScope and Clock classes. In this way, we decouple the OpenDC modules and their logic from simulation-related code. In this way, we also simplify eventual attempt for emulating OpenDC componments in real-time.
Diffstat (limited to 'simulator/opendc/opendc-workflows/src/test')
-rw-r--r--simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt39
1 files changed, 15 insertions, 24 deletions
diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 655d8e1d..114003a3 100644
--- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -24,8 +24,6 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
@@ -35,22 +33,22 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import kotlinx.coroutines.async
-import kotlinx.coroutines.delay
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
-import java.util.ServiceLoader
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import kotlin.math.max
/**
* Integration test suite for the [StageWorkflowService].
*/
@DisplayName("StageWorkflowService")
+@OptIn(ExperimentalCoroutinesApi::class)
internal class StageWorkflowSchedulerIntegrationTest {
/**
* A large integration test where we check whether all tasks in some trace are executed correctly.
@@ -63,17 +61,15 @@ internal class StageWorkflowSchedulerIntegrationTest {
var tasksStarted = 0L
var tasksFinished = 0L
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider(name = "sim")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
- val schedulerDomain = system.newDomain(name = "scheduler")
- val schedulerAsync = schedulerDomain.async {
- val clock = simulationContext.clock
+ val schedulerAsync = testScope.async {
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
- .use { it.construct(schedulerDomain) }
+ .use { it.construct(testScope, clock) }
StageWorkflowService(
- schedulerDomain,
+ testScope,
clock,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
@@ -86,9 +82,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
)
}
- val broker = system.newDomain(name = "broker")
-
- broker.launch {
+ testScope.launch {
val scheduler = schedulerAsync.await()
scheduler.events
.onEach { event ->
@@ -102,24 +96,21 @@ internal class StageWorkflowSchedulerIntegrationTest {
.collect()
}
- broker.launch {
- val ctx = simulationContext
+ testScope.launch {
val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf"))
val scheduler = schedulerAsync.await()
while (reader.hasNext()) {
val (time, job) = reader.next()
jobsSubmitted++
- delay(max(0, time * 1000 - ctx.clock.millis()))
+ delay(max(0, time * 1000 - clock.millis()))
scheduler.submit(job)
}
}
- runBlocking {
- system.run()
- system.terminate()
- }
+ testScope.advanceUntilIdle()
+ assertNotEquals(0, jobsSubmitted, "No jobs submitted")
assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started")
assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished")
assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished")