From fcae560208df4860bc7461f955bf3b522b0e61c5 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 30 Sep 2020 23:56:07 +0200 Subject: Migrate from Domain to TestCoroutineScope This change eliminates the use of Domain and simulationContext in favour of the generic (Test)CoroutineScope and Clock classes. In this way, we decouple the OpenDC modules and their logic from simulation-related code. In this way, we also simplify eventual attempt for emulating OpenDC componments in real-time. --- .../StageWorkflowSchedulerIntegrationTest.kt | 39 +++++++++------------- 1 file changed, 15 insertions(+), 24 deletions(-) (limited to 'simulator/opendc/opendc-workflows/src') diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 655d8e1d..114003a3 100644 --- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -24,8 +24,6 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader @@ -35,22 +33,22 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import kotlinx.coroutines.async -import kotlinx.coroutines.delay +import kotlinx.coroutines.* import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test -import java.util.ServiceLoader +import org.opendc.simulator.utils.DelayControllerClockAdapter import kotlin.math.max /** * Integration test suite for the [StageWorkflowService]. */ @DisplayName("StageWorkflowService") +@OptIn(ExperimentalCoroutinesApi::class) internal class StageWorkflowSchedulerIntegrationTest { /** * A large integration test where we check whether all tasks in some trace are executed correctly. @@ -63,17 +61,15 @@ internal class StageWorkflowSchedulerIntegrationTest { var tasksStarted = 0L var tasksFinished = 0L - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider(name = "sim") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) - val schedulerDomain = system.newDomain(name = "scheduler") - val schedulerAsync = schedulerDomain.async { - val clock = simulationContext.clock + val schedulerAsync = testScope.async { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.construct(schedulerDomain) } + .use { it.construct(testScope, clock) } StageWorkflowService( - schedulerDomain, + testScope, clock, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), @@ -86,9 +82,7 @@ internal class StageWorkflowSchedulerIntegrationTest { ) } - val broker = system.newDomain(name = "broker") - - broker.launch { + testScope.launch { val scheduler = schedulerAsync.await() scheduler.events .onEach { event -> @@ -102,24 +96,21 @@ internal class StageWorkflowSchedulerIntegrationTest { .collect() } - broker.launch { - val ctx = simulationContext + testScope.launch { val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) val scheduler = schedulerAsync.await() while (reader.hasNext()) { val (time, job) = reader.next() jobsSubmitted++ - delay(max(0, time * 1000 - ctx.clock.millis())) + delay(max(0, time * 1000 - clock.millis())) scheduler.submit(job) } } - runBlocking { - system.run() - system.terminate() - } + testScope.advanceUntilIdle() + assertNotEquals(0, jobsSubmitted, "No jobs submitted") assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") -- cgit v1.2.3