summaryrefslogtreecommitdiff
path: root/opendc-workflow
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-workflow')
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt13
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt17
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt6
3 files changed, 18 insertions, 18 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index 2436c387..07b43b6d 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -22,6 +22,7 @@
package org.opendc.workflow.service
+import org.opendc.common.Dispatcher
import org.opendc.compute.api.ComputeClient
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.internal.WorkflowServiceImpl
@@ -30,9 +31,7 @@ import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats
-import java.time.Clock
import java.time.Duration
-import kotlin.coroutines.CoroutineContext
/**
* A service for cloud workflow execution.
@@ -59,9 +58,7 @@ public interface WorkflowService : AutoCloseable {
/**
* Construct a new [WorkflowService] implementation.
*
- * @param context The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
- * @param meterProvider The meter provider to use.
+ * @param dispatcher A [Dispatcher] to schedule future events.
* @param compute The "Compute" client to use.
* @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles).
* @param jobAdmissionPolicy The job admission policy to use.
@@ -70,8 +67,7 @@ public interface WorkflowService : AutoCloseable {
* @param taskOrderPolicy The task order policy to use.
*/
public operator fun invoke(
- context: CoroutineContext,
- clock: Clock,
+ dispatcher: Dispatcher,
compute: ComputeClient,
schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -80,8 +76,7 @@ public interface WorkflowService : AutoCloseable {
taskOrderPolicy: TaskOrderPolicy
): WorkflowService {
return WorkflowServiceImpl(
- context,
- clock,
+ dispatcher,
compute,
schedulingQuantum,
jobAdmissionPolicy,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index b1780896..01c1f565 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -26,6 +26,8 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.common.Dispatcher
+import org.opendc.common.asCoroutineDispatcher
import org.opendc.common.util.Pacer
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.api.Image
@@ -40,11 +42,10 @@ import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats
-import java.time.Clock
import java.time.Duration
+import java.time.InstantSource
import java.util.PriorityQueue
import java.util.Queue
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
/**
@@ -52,8 +53,7 @@ import kotlin.coroutines.resume
* Datacenter Scheduling.
*/
public class WorkflowServiceImpl(
- context: CoroutineContext,
- private val clock: Clock,
+ dispatcher: Dispatcher,
private val computeClient: ComputeClient,
schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -64,7 +64,12 @@ public class WorkflowServiceImpl(
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
*/
- private val scope = CoroutineScope(context + kotlinx.coroutines.Job())
+ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher() + kotlinx.coroutines.Job())
+
+ /**
+ * The [InstantSource] representing the clock of this service.
+ */
+ private val clock = dispatcher.timeSource
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -149,7 +154,7 @@ public class WorkflowServiceImpl(
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
+ private val pacer = Pacer(dispatcher, schedulingQuantum.toMillis()) { doSchedule() }
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index b165418a..e5e05a92 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -70,7 +70,7 @@ internal class WorkflowServiceTest {
val computeService = "compute.opendc.org"
val workflowService = "workflow.opendc.org"
- Provisioner(coroutineContext, clock, seed = 0L).use { provisioner ->
+ Provisioner(dispatcher, seed = 0L).use { provisioner ->
val scheduler: (ProvisioningContext) -> ComputeScheduler = {
FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
@@ -103,7 +103,7 @@ internal class WorkflowServiceTest {
Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()),
format = "gwf"
)
- service.replay(clock, trace.toJobs())
+ service.replay(timeSource, trace.toJobs())
val metrics = service.getSchedulerStats()
@@ -119,7 +119,7 @@ internal class WorkflowServiceTest {
},
{ assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") },
{ assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
- { assertEquals(45977707L, clock.millis()) { "Total duration incorrect" } }
+ { assertEquals(45977707L, timeSource.millis()) { "Total duration incorrect" } }
)
}
}