diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-04 10:04:50 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-10-04 10:04:50 +0200 |
| commit | 92cc0908b7ad6c94b08e6016f8815ab07cd1714d (patch) | |
| tree | b5edaff69212986265f9edc620e40bb8695f11eb /opendc-workflow/opendc-workflow-service/src | |
| parent | 2d2a3854d355bd4b074ef651f291d34081e70d96 (diff) | |
| parent | bd476d11ab24fe745bb54e97a11133706bb96cb1 (diff) | |
merge: Add provisioning tool for experiments (#104)
This pull request implements a new tool to help provision and manage the
experimental environment.
## Implementation Notes :hammer_and_pick:
* Add service registry for cloud services
* Add provisioning tool for experiments
* Add provisioning step for workflow service
* Add provisioners for FaaS service
* Use experiment base for Capelin experiments
* Use experiment base for web runner
* Integrate compute workload classes
* Remove Topology interface
## Breaking API Changes :warning:
* Removal of the `opendc-compute-workload`, `opendc-faas-workload`,
and `opendc-workflow-workload` modules. These are now located
inside `opendc-experiments`
* Removal of `ComputeServiceHelper`. Use `Provisioner` to provision
a `ComputeService`.
Diffstat (limited to 'opendc-workflow/opendc-workflow-service/src')
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt | 98 |
1 files changed, 53 insertions, 45 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 0fb8b67c..f8039e1d 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -22,17 +22,22 @@ package org.opendc.workflow.service +import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll +import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import org.opendc.compute.workload.ComputeServiceHelper -import org.opendc.compute.workload.topology.HostSpec +import org.opendc.experiments.compute.setupComputeService +import org.opendc.experiments.compute.setupHosts +import org.opendc.experiments.compute.topology.HostSpec +import org.opendc.experiments.provisioner.Provisioner +import org.opendc.experiments.provisioner.ProvisioningContext +import org.opendc.experiments.workflow.* import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -46,9 +51,6 @@ import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy -import org.opendc.workflow.workload.WorkflowSchedulerSpec -import org.opendc.workflow.workload.WorkflowServiceHelper -import org.opendc.workflow.workload.toJobs import java.nio.file.Paths import java.time.Duration import java.util.* @@ -63,55 +65,61 @@ internal class WorkflowServiceTest { */ @Test fun testTrace() = runBlockingSimulation { - // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts - val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), - weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) - ) + val computeService = "compute.opendc.org" + val workflowService = "workflow.opendc.org" - val computeHelper = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed = 0, - schedulingQuantum = Duration.ofSeconds(1) - ) + Provisioner(coroutineContext, clock, seed = 0L).use { provisioner -> + val scheduler: (ProvisioningContext) -> ComputeScheduler = { + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), + weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) + ) + } - val hostCount = 4 - repeat(hostCount) { computeHelper.registerHost(createHostSpec(it)) } + provisioner.runSteps( + // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts + setupComputeService(computeService, scheduler, schedulingQuantum = Duration.ofSeconds(1)), + setupHosts(computeService, List(4) { createHostSpec(it) }), - // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines - val workflowScheduler = WorkflowSchedulerSpec( - schedulingQuantum = Duration.ofMillis(100), - jobAdmissionPolicy = NullJobAdmissionPolicy, - jobOrderPolicy = SubmissionTimeJobOrderPolicy(), - taskEligibilityPolicy = NullTaskEligibilityPolicy, - taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), - ) - val workflowHelper = WorkflowServiceHelper(coroutineContext, clock, computeHelper.service.newClient(), workflowScheduler) + // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines + setupWorkflowService( + workflowService, + computeService, + WorkflowSchedulerSpec( + schedulingQuantum = Duration.ofMillis(100), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobOrderPolicy = SubmissionTimeJobOrderPolicy(), + taskEligibilityPolicy = NullTaskEligibilityPolicy, + taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), + ) + ) + ) + + val service = provisioner.registry.resolve(workflowService, WorkflowService::class.java)!! - try { val trace = Trace.open( Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), format = "gwf" ) + service.replay(clock, trace.toJobs()) - workflowHelper.replay(trace.toJobs()) - } finally { - workflowHelper.close() - computeHelper.close() - } - - val metrics = workflowHelper.service.getSchedulerStats() + val metrics = service.getSchedulerStats() - assertAll( - { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") }, - { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") }, - { assertEquals(metrics.workflowsSubmitted, metrics.workflowsFinished, "Not all started jobs finished") }, - { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, - { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, - { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } } - ) + assertAll( + { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") }, + { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") }, + { + assertEquals( + metrics.workflowsSubmitted, + metrics.workflowsFinished, + "Not all started jobs finished" + ) + }, + { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, + { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, + { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } } + ) + } } /** |
