diff options
Diffstat (limited to 'simulator/opendc/opendc-workflows')
2 files changed, 16 insertions, 25 deletions
diff --git a/simulator/opendc/opendc-workflows/build.gradle.kts b/simulator/opendc/opendc-workflows/build.gradle.kts index 893c9020..62c4bc25 100644 --- a/simulator/opendc/opendc-workflows/build.gradle.kts +++ b/simulator/opendc/opendc-workflows/build.gradle.kts @@ -33,7 +33,7 @@ dependencies { api(project(":opendc:opendc-core")) api(project(":opendc:opendc-compute")) - testRuntimeOnly(project(":odcsim:odcsim-engine-omega")) + testImplementation(project(":opendc:opendc-simulator")) testImplementation(project(":opendc:opendc-format")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 655d8e1d..114003a3 100644 --- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -24,8 +24,6 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader @@ -35,22 +33,22 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import kotlinx.coroutines.async -import kotlinx.coroutines.delay +import kotlinx.coroutines.* import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test -import java.util.ServiceLoader +import org.opendc.simulator.utils.DelayControllerClockAdapter import kotlin.math.max /** * Integration test suite for the [StageWorkflowService]. */ @DisplayName("StageWorkflowService") +@OptIn(ExperimentalCoroutinesApi::class) internal class StageWorkflowSchedulerIntegrationTest { /** * A large integration test where we check whether all tasks in some trace are executed correctly. @@ -63,17 +61,15 @@ internal class StageWorkflowSchedulerIntegrationTest { var tasksStarted = 0L var tasksFinished = 0L - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider(name = "sim") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) - val schedulerDomain = system.newDomain(name = "scheduler") - val schedulerAsync = schedulerDomain.async { - val clock = simulationContext.clock + val schedulerAsync = testScope.async { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.construct(schedulerDomain) } + .use { it.construct(testScope, clock) } StageWorkflowService( - schedulerDomain, + testScope, clock, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), @@ -86,9 +82,7 @@ internal class StageWorkflowSchedulerIntegrationTest { ) } - val broker = system.newDomain(name = "broker") - - broker.launch { + testScope.launch { val scheduler = schedulerAsync.await() scheduler.events .onEach { event -> @@ -102,24 +96,21 @@ internal class StageWorkflowSchedulerIntegrationTest { .collect() } - broker.launch { - val ctx = simulationContext + testScope.launch { val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) val scheduler = schedulerAsync.await() while (reader.hasNext()) { val (time, job) = reader.next() jobsSubmitted++ - delay(max(0, time * 1000 - ctx.clock.millis())) + delay(max(0, time * 1000 - clock.millis())) scheduler.submit(job) } } - runBlocking { - system.run() - system.terminate() - } + testScope.advanceUntilIdle() + assertNotEquals(0, jobsSubmitted, "No jobs submitted") assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") |
