diff options
Diffstat (limited to 'opendc-workflow')
3 files changed, 18 insertions, 18 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index 2436c387..07b43b6d 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,6 +22,7 @@ package org.opendc.workflow.service +import org.opendc.common.Dispatcher import org.opendc.compute.api.ComputeClient import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl @@ -30,9 +31,7 @@ import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats -import java.time.Clock import java.time.Duration -import kotlin.coroutines.CoroutineContext /** * A service for cloud workflow execution. @@ -59,9 +58,7 @@ public interface WorkflowService : AutoCloseable { /** * Construct a new [WorkflowService] implementation. * - * @param context The [CoroutineContext] to use in the service. - * @param clock The clock instance to use. - * @param meterProvider The meter provider to use. + * @param dispatcher A [Dispatcher] to schedule future events. * @param compute The "Compute" client to use. * @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles). * @param jobAdmissionPolicy The job admission policy to use. @@ -70,8 +67,7 @@ public interface WorkflowService : AutoCloseable { * @param taskOrderPolicy The task order policy to use. */ public operator fun invoke( - context: CoroutineContext, - clock: Clock, + dispatcher: Dispatcher, compute: ComputeClient, schedulingQuantum: Duration, jobAdmissionPolicy: JobAdmissionPolicy, @@ -80,8 +76,7 @@ public interface WorkflowService : AutoCloseable { taskOrderPolicy: TaskOrderPolicy ): WorkflowService { return WorkflowServiceImpl( - context, - clock, + dispatcher, compute, schedulingQuantum, jobAdmissionPolicy, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index b1780896..01c1f565 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -26,6 +26,8 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.cancel import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.common.Dispatcher +import org.opendc.common.asCoroutineDispatcher import org.opendc.common.util.Pacer import org.opendc.compute.api.ComputeClient import org.opendc.compute.api.Image @@ -40,11 +42,10 @@ import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats -import java.time.Clock import java.time.Duration +import java.time.InstantSource import java.util.PriorityQueue import java.util.Queue -import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume /** @@ -52,8 +53,7 @@ import kotlin.coroutines.resume * Datacenter Scheduling. */ public class WorkflowServiceImpl( - context: CoroutineContext, - private val clock: Clock, + dispatcher: Dispatcher, private val computeClient: ComputeClient, schedulingQuantum: Duration, jobAdmissionPolicy: JobAdmissionPolicy, @@ -64,7 +64,12 @@ public class WorkflowServiceImpl( /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. */ - private val scope = CoroutineScope(context + kotlinx.coroutines.Job()) + private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher() + kotlinx.coroutines.Job()) + + /** + * The [InstantSource] representing the clock of this service. + */ + private val clock = dispatcher.timeSource /** * The incoming jobs ready to be processed by the scheduler. @@ -149,7 +154,7 @@ public class WorkflowServiceImpl( /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } + private val pacer = Pacer(dispatcher, schedulingQuantum.toMillis()) { doSchedule() } private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index b165418a..e5e05a92 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -70,7 +70,7 @@ internal class WorkflowServiceTest { val computeService = "compute.opendc.org" val workflowService = "workflow.opendc.org" - Provisioner(coroutineContext, clock, seed = 0L).use { provisioner -> + Provisioner(dispatcher, seed = 0L).use { provisioner -> val scheduler: (ProvisioningContext) -> ComputeScheduler = { FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), @@ -103,7 +103,7 @@ internal class WorkflowServiceTest { Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), format = "gwf" ) - service.replay(clock, trace.toJobs()) + service.replay(timeSource, trace.toJobs()) val metrics = service.getSchedulerStats() @@ -119,7 +119,7 @@ internal class WorkflowServiceTest { }, { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, - { assertEquals(45977707L, clock.millis()) { "Total duration incorrect" } } + { assertEquals(45977707L, timeSource.millis()) { "Total duration incorrect" } } ) } } |
