summaryrefslogtreecommitdiff
path: root/opendc-workflow/opendc-workflow-service/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-09-28 15:51:05 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-03 17:35:58 +0200
commitc453e27abe54221f76648bc91edadb2efcd1ec07 (patch)
tree2eb75de390dc735519c6d29bf2a6d50694436d26 /opendc-workflow/opendc-workflow-service/src
parent115e37984624a409bc1ad4f54bf10c9537183390 (diff)
feat(exp/workflow): Add provisioning step for workflow service
This change adds a new module `opendc-experiments-workflow` that provides provisioner implementations for experiments to use for setting up and using the workflow engine in OpenDC.
Diffstat (limited to 'opendc-workflow/opendc-workflow-service/src')
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt96
1 files changed, 52 insertions, 44 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index 0fb8b67c..49496fed 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -22,17 +22,22 @@
package org.opendc.workflow.service
+import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
+import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
-import org.opendc.compute.workload.ComputeServiceHelper
import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.experiments.compute.setupComputeService
+import org.opendc.experiments.compute.setupHosts
+import org.opendc.experiments.provisioner.Provisioner
+import org.opendc.experiments.provisioner.ProvisioningContext
+import org.opendc.experiments.workflow.*
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -46,9 +51,6 @@ import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
-import org.opendc.workflow.workload.WorkflowSchedulerSpec
-import org.opendc.workflow.workload.WorkflowServiceHelper
-import org.opendc.workflow.workload.toJobs
import java.nio.file.Paths
import java.time.Duration
import java.util.*
@@ -63,55 +65,61 @@ internal class WorkflowServiceTest {
*/
@Test
fun testTrace() = runBlockingSimulation {
- // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts
- val computeScheduler = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
- weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
- )
+ val computeService = "compute.opendc.org"
+ val workflowService = "workflow.opendc.org"
- val computeHelper = ComputeServiceHelper(
- coroutineContext,
- clock,
- computeScheduler,
- seed = 0,
- schedulingQuantum = Duration.ofSeconds(1)
- )
+ Provisioner(coroutineContext, clock, seed = 0L).use { provisioner ->
+ val scheduler: (ProvisioningContext) -> ComputeScheduler = {
+ FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
+ weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
+ )
+ }
- val hostCount = 4
- repeat(hostCount) { computeHelper.registerHost(createHostSpec(it)) }
+ provisioner.runSteps(
+ // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts
+ setupComputeService(computeService, scheduler, schedulingQuantum = Duration.ofSeconds(1)),
+ setupHosts(computeService, List(4) { createHostSpec(it) }),
- // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines
- val workflowScheduler = WorkflowSchedulerSpec(
- schedulingQuantum = Duration.ofMillis(100),
- jobAdmissionPolicy = NullJobAdmissionPolicy,
- jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
- taskEligibilityPolicy = NullTaskEligibilityPolicy,
- taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
- )
- val workflowHelper = WorkflowServiceHelper(coroutineContext, clock, computeHelper.service.newClient(), workflowScheduler)
+ // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines
+ setupWorkflowService(
+ workflowService,
+ computeService,
+ WorkflowSchedulerSpec(
+ schedulingQuantum = Duration.ofMillis(100),
+ jobAdmissionPolicy = NullJobAdmissionPolicy,
+ jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
+ taskEligibilityPolicy = NullTaskEligibilityPolicy,
+ taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
+ )
+ )
+ )
+
+ val service = provisioner.registry.resolve(workflowService, WorkflowService::class.java)!!
- try {
val trace = Trace.open(
Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()),
format = "gwf"
)
+ service.replay(clock, trace.toJobs())
- workflowHelper.replay(trace.toJobs())
- } finally {
- workflowHelper.close()
- computeHelper.close()
- }
-
- val metrics = workflowHelper.service.getSchedulerStats()
+ val metrics = service.getSchedulerStats()
- assertAll(
- { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") },
- { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") },
- { assertEquals(metrics.workflowsSubmitted, metrics.workflowsFinished, "Not all started jobs finished") },
- { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") },
- { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
- { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } }
- )
+ assertAll(
+ { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") },
+ { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") },
+ {
+ assertEquals(
+ metrics.workflowsSubmitted,
+ metrics.workflowsFinished,
+ "Not all started jobs finished"
+ )
+ },
+ { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") },
+ { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
+ { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } }
+ )
+ }
}
/**