summaryrefslogtreecommitdiff
path: root/opendc-workflow/opendc-workflow-service
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-11-09 21:59:07 +0000
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-11-13 17:42:01 +0000
commitfb2672afb2d8236d5291cd028196c99d8e4d47f1 (patch)
tree508bbec117239b3d8490cd1bde8d12b6a8ab2155 /opendc-workflow/opendc-workflow-service
parent00ac59e8e9d6a41c2eac55aa25420dce8fa9c6e0 (diff)
refactor: Replace use of CoroutineContext by Dispatcher
This change replaces the use of `CoroutineContext` for passing the `SimulationDispatcher` across the different modules of OpenDC by the lightweight `Dispatcher` interface of the OpenDC common module.
Diffstat (limited to 'opendc-workflow/opendc-workflow-service')
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt13
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt15
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt2
3 files changed, 15 insertions, 15 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index f0e86449..07b43b6d 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -22,6 +22,7 @@
package org.opendc.workflow.service
+import org.opendc.common.Dispatcher
import org.opendc.compute.api.ComputeClient
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.internal.WorkflowServiceImpl
@@ -31,8 +32,6 @@ import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats
import java.time.Duration
-import java.time.InstantSource
-import kotlin.coroutines.CoroutineContext
/**
* A service for cloud workflow execution.
@@ -59,9 +58,7 @@ public interface WorkflowService : AutoCloseable {
/**
* Construct a new [WorkflowService] implementation.
*
- * @param context The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
- * @param meterProvider The meter provider to use.
+ * @param dispatcher A [Dispatcher] to schedule future events.
* @param compute The "Compute" client to use.
* @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles).
* @param jobAdmissionPolicy The job admission policy to use.
@@ -70,8 +67,7 @@ public interface WorkflowService : AutoCloseable {
* @param taskOrderPolicy The task order policy to use.
*/
public operator fun invoke(
- context: CoroutineContext,
- clock: InstantSource,
+ dispatcher: Dispatcher,
compute: ComputeClient,
schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -80,8 +76,7 @@ public interface WorkflowService : AutoCloseable {
taskOrderPolicy: TaskOrderPolicy
): WorkflowService {
return WorkflowServiceImpl(
- context,
- clock,
+ dispatcher,
compute,
schedulingQuantum,
jobAdmissionPolicy,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index 20e30fd4..01c1f565 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -26,6 +26,8 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.common.Dispatcher
+import org.opendc.common.asCoroutineDispatcher
import org.opendc.common.util.Pacer
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.api.Image
@@ -44,7 +46,6 @@ import java.time.Duration
import java.time.InstantSource
import java.util.PriorityQueue
import java.util.Queue
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
/**
@@ -52,8 +53,7 @@ import kotlin.coroutines.resume
* Datacenter Scheduling.
*/
public class WorkflowServiceImpl(
- context: CoroutineContext,
- private val clock: InstantSource,
+ dispatcher: Dispatcher,
private val computeClient: ComputeClient,
schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -64,7 +64,12 @@ public class WorkflowServiceImpl(
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
*/
- private val scope = CoroutineScope(context + kotlinx.coroutines.Job())
+ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher() + kotlinx.coroutines.Job())
+
+ /**
+ * The [InstantSource] representing the clock of this service.
+ */
+ private val clock = dispatcher.timeSource
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -149,7 +154,7 @@ public class WorkflowServiceImpl(
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
+ private val pacer = Pacer(dispatcher, schedulingQuantum.toMillis()) { doSchedule() }
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index f5edbb2f..e5e05a92 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -70,7 +70,7 @@ internal class WorkflowServiceTest {
val computeService = "compute.opendc.org"
val workflowService = "workflow.opendc.org"
- Provisioner(coroutineContext, timeSource, seed = 0L).use { provisioner ->
+ Provisioner(dispatcher, seed = 0L).use { provisioner ->
val scheduler: (ProvisioningContext) -> ComputeScheduler = {
FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),