diff options
Diffstat (limited to 'opendc-workflow/opendc-workflow-service/src')
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt | 96 |
1 files changed, 52 insertions, 44 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 0fb8b67c..49496fed 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -22,17 +22,22 @@ package org.opendc.workflow.service +import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll +import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import org.opendc.compute.workload.ComputeServiceHelper import org.opendc.compute.workload.topology.HostSpec +import org.opendc.experiments.compute.setupComputeService +import org.opendc.experiments.compute.setupHosts +import org.opendc.experiments.provisioner.Provisioner +import org.opendc.experiments.provisioner.ProvisioningContext +import org.opendc.experiments.workflow.* import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -46,9 +51,6 @@ import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy -import org.opendc.workflow.workload.WorkflowSchedulerSpec -import org.opendc.workflow.workload.WorkflowServiceHelper -import org.opendc.workflow.workload.toJobs import java.nio.file.Paths import java.time.Duration import java.util.* @@ -63,55 +65,61 @@ internal class WorkflowServiceTest { */ @Test fun testTrace() = runBlockingSimulation { - // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts - val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), - weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) - ) + val computeService = "compute.opendc.org" + val workflowService = "workflow.opendc.org" - val computeHelper = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed = 0, - schedulingQuantum = Duration.ofSeconds(1) - ) + Provisioner(coroutineContext, clock, seed = 0L).use { provisioner -> + val scheduler: (ProvisioningContext) -> ComputeScheduler = { + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), + weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) + ) + } - val hostCount = 4 - repeat(hostCount) { computeHelper.registerHost(createHostSpec(it)) } + provisioner.runSteps( + // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts + setupComputeService(computeService, scheduler, schedulingQuantum = Duration.ofSeconds(1)), + setupHosts(computeService, List(4) { createHostSpec(it) }), - // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines - val workflowScheduler = WorkflowSchedulerSpec( - schedulingQuantum = Duration.ofMillis(100), - jobAdmissionPolicy = NullJobAdmissionPolicy, - jobOrderPolicy = SubmissionTimeJobOrderPolicy(), - taskEligibilityPolicy = NullTaskEligibilityPolicy, - taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), - ) - val workflowHelper = WorkflowServiceHelper(coroutineContext, clock, computeHelper.service.newClient(), workflowScheduler) + // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines + setupWorkflowService( + workflowService, + computeService, + WorkflowSchedulerSpec( + schedulingQuantum = Duration.ofMillis(100), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobOrderPolicy = SubmissionTimeJobOrderPolicy(), + taskEligibilityPolicy = NullTaskEligibilityPolicy, + taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), + ) + ) + ) + + val service = provisioner.registry.resolve(workflowService, WorkflowService::class.java)!! - try { val trace = Trace.open( Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), format = "gwf" ) + service.replay(clock, trace.toJobs()) - workflowHelper.replay(trace.toJobs()) - } finally { - workflowHelper.close() - computeHelper.close() - } - - val metrics = workflowHelper.service.getSchedulerStats() + val metrics = service.getSchedulerStats() - assertAll( - { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") }, - { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") }, - { assertEquals(metrics.workflowsSubmitted, metrics.workflowsFinished, "Not all started jobs finished") }, - { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, - { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, - { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } } - ) + assertAll( + { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") }, + { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") }, + { + assertEquals( + metrics.workflowsSubmitted, + metrics.workflowsFinished, + "Not all started jobs finished" + ) + }, + { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, + { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, + { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } } + ) + } } /** |
