summaryrefslogtreecommitdiff
path: root/opendc/opendc-workflows
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-02-14 19:11:35 +0100
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-02-14 19:11:35 +0100
commit04e4bddccc4e06a126f3c6ee2878502323c7116e (patch)
tree716253f4c03cf8dc6754430a5f63d0a57061d795 /opendc/opendc-workflows
parentcd293b79ef2066ffcb605b9c625d6ab0a9af1d16 (diff)
parentb13ba01e967e1a281d58b37cb57986b47ec99dd8 (diff)
Merge branch 'feat/cpu-sharing' into 'feat/2.x'
Add basis for VM modeling and fractional space-sharing See merge request opendc/opendc-simulator!23
Diffstat (limited to 'opendc/opendc-workflows')
-rw-r--r--opendc/opendc-workflows/build.gradle.kts1
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt)41
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt59
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt)140
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt)27
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt149
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt8
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt10
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt8
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt6
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt8
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt12
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt14
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt12
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt12
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt8
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt10
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt8
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt8
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt8
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt6
21 files changed, 167 insertions, 388 deletions
diff --git a/opendc/opendc-workflows/build.gradle.kts b/opendc/opendc-workflows/build.gradle.kts
index 6aa044e8..08455368 100644
--- a/opendc/opendc-workflows/build.gradle.kts
+++ b/opendc/opendc-workflows/build.gradle.kts
@@ -31,6 +31,7 @@ plugins {
dependencies {
api(project(":opendc:opendc-core"))
+ api(project(":opendc:opendc-compute"))
implementation(kotlin("stdlib"))
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt
index 0b3ba828..3c77d57a 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt
@@ -1,7 +1,7 @@
/*
* MIT License
*
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2020 atlarge-research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,35 +22,32 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.workflows.service
+package com.atlarge.opendc.workflows.monitor
-import com.atlarge.odcsim.ProcessContext
-import com.atlarge.odcsim.SendRef
-import com.atlarge.opendc.core.resources.compute.scheduling.ProcessObserver
-import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse
import com.atlarge.opendc.workflows.workload.Job
-import kotlinx.coroutines.CoroutineScope
+import com.atlarge.opendc.workflows.workload.Task
/**
- * A workflow scheduler interface that schedules jobs across machines.
- *
- * @property ctx The context in which the scheduler runs.
- * @property timers The timer scheduler to use.
- * @property lease The resource lease to use.
+ * An interface for monitoring the progression of workflows.
*/
-abstract class WorkflowSchedulerLogic(
- protected val ctx: ProcessContext,
- protected val self: WorkflowServiceRef,
- protected val coroutineScope: CoroutineScope,
- protected val lease: ProvisioningResponse.Lease
-) : ProcessObserver {
+public interface WorkflowMonitor {
+ /**
+ * This method is invoked when a job has become active.
+ */
+ public suspend fun onJobStart(job: Job, time: Long)
+
+ /**
+ * This method is invoked when a job has finished processing.
+ */
+ public suspend fun onJobFinish(job: Job, time: Long)
+
/**
- * Submit the specified workflow for scheduling.
+ * This method is invoked when a task of a job has started processing.
*/
- abstract suspend fun submit(job: Job, handler: SendRef<WorkflowEvent>)
+ public suspend fun onTaskStart(job: Job, task: Task, time: Long)
/**
- * Trigger an immediate scheduling cycle.
+ * This method is invoked when a task has finished processing.
*/
- abstract suspend fun schedule()
+ public suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long)
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt
deleted file mode 100644
index d4240421..00000000
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.workflows.service
-
-import com.atlarge.odcsim.ProcessContext
-import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse
-import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy
-import com.atlarge.opendc.workflows.service.stage.job.JobSortingPolicy
-import com.atlarge.opendc.workflows.service.stage.resource.ResourceDynamicFilterPolicy
-import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPolicy
-import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy
-import com.atlarge.opendc.workflows.service.stage.task.TaskSortingPolicy
-import kotlinx.coroutines.CoroutineScope
-
-/**
- * A [WorkflowScheduler] that distributes work through a multi-stage process based on the Reference Architecture for
- * Datacenter Scheduling.
- */
-class StageWorkflowScheduler(
- private val mode: WorkflowSchedulerMode,
- private val jobAdmissionPolicy: JobAdmissionPolicy,
- private val jobSortingPolicy: JobSortingPolicy,
- private val taskEligibilityPolicy: TaskEligibilityPolicy,
- private val taskSortingPolicy: TaskSortingPolicy,
- private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy,
- private val resourceSelectionPolicy: ResourceSelectionPolicy
-) : WorkflowScheduler {
- override fun invoke(
- ctx: ProcessContext,
- self: WorkflowServiceRef,
- coroutineScope: CoroutineScope,
- lease: ProvisioningResponse.Lease
- ): WorkflowSchedulerLogic {
- return StageWorkflowSchedulerLogic(ctx, self, coroutineScope, lease, mode, jobAdmissionPolicy,
- jobSortingPolicy, taskEligibilityPolicy, taskSortingPolicy, resourceDynamicFilterPolicy, resourceSelectionPolicy)
- }
-}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index c6162f5e..d7b29c32 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -25,18 +25,12 @@
package com.atlarge.opendc.workflows.service
import com.atlarge.odcsim.ProcessContext
-import com.atlarge.odcsim.SendPort
-import com.atlarge.odcsim.SendRef
-import com.atlarge.odcsim.sendOnce
-import com.atlarge.opendc.core.resources.compute.MachineEvent
-import com.atlarge.opendc.core.resources.compute.MachineMessage
-import com.atlarge.opendc.core.resources.compute.MachineRef
-import com.atlarge.opendc.core.resources.compute.scheduling.ProcessObserver
-import com.atlarge.opendc.core.resources.compute.scheduling.ProcessState
-import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse
-import com.atlarge.opendc.core.services.resources.HostView
-import com.atlarge.opendc.core.workload.application.Application
-import com.atlarge.opendc.core.workload.application.Pid
+import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerState
+import com.atlarge.opendc.compute.core.monitor.ServerMonitor
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy
import com.atlarge.opendc.workflows.service.stage.job.JobSortingPolicy
import com.atlarge.opendc.workflows.service.stage.resource.ResourceDynamicFilterPolicy
@@ -45,18 +39,16 @@ import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.TaskSortingPolicy
import com.atlarge.opendc.workflows.workload.Job
import com.atlarge.opendc.workflows.workload.Task
-import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
/**
- * Logic of the [StageWorkflowScheduler].
+ * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
+ * Datacenter Scheduling.
*/
-class StageWorkflowSchedulerLogic(
- ctx: ProcessContext,
- self: WorkflowServiceRef,
- coroutineScope: CoroutineScope,
- lease: ProvisioningResponse.Lease,
+class StageWorkflowService(
+ private val ctx: ProcessContext,
+ private val provisioningService: ProvisioningService,
private val mode: WorkflowSchedulerMode,
private val jobAdmissionPolicy: JobAdmissionPolicy,
private val jobSortingPolicy: JobSortingPolicy,
@@ -64,7 +56,7 @@ class StageWorkflowSchedulerLogic(
private val taskSortingPolicy: TaskSortingPolicy,
private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy,
private val resourceSelectionPolicy: ResourceSelectionPolicy
-) : WorkflowSchedulerLogic(ctx, self, coroutineScope, lease) {
+) : WorkflowService, ServerMonitor {
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -77,30 +69,30 @@ class StageWorkflowSchedulerLogic(
internal val activeJobs: MutableSet<JobView> = mutableSetOf()
/**
- * The running tasks by [Pid].
+ * The running tasks by [Server].
*/
- internal val taskByPid = mutableMapOf<Pid, TaskView>()
+ internal val taskByServer = mutableMapOf<Server, TaskView>()
/**
- * The available processor cores on the leased machines.
+ * The nodes that are controlled by the service.
*/
- internal val machineCores: MutableMap<HostView, Int> = HashMap()
+ internal lateinit var nodes: List<Node>
- private val brokers: MutableMap<SendRef<WorkflowEvent>, SendPort<WorkflowEvent>> = HashMap()
- private val channel = ctx.open<MachineEvent>()
+ /**
+ * The available nodes.
+ */
+ internal val available: MutableSet<Node> = mutableSetOf()
init {
- lease.hosts.forEach { machineCores[it] = it.host.cores.count() }
- coroutineScope.launch {
- ProcessObserver(ctx, this@StageWorkflowSchedulerLogic, channel.receive)
+ ctx.launch {
+ nodes = provisioningService.nodes().toList()
+ available.addAll(nodes)
}
}
- override suspend fun submit(job: Job, handler: SendRef<WorkflowEvent>) {
- val broker = brokers.getOrPut(handler) { ctx.connect(handler) }
-
+ override suspend fun submit(job: Job, monitor: WorkflowMonitor) {
// J1 Incoming Jobs
- val jobInstance = JobView(job, handler)
+ val jobInstance = JobView(job, monitor)
val instances = job.tasks.associateWith {
TaskView(jobInstance, it)
}
@@ -113,13 +105,12 @@ class StageWorkflowSchedulerLogic(
// If the task has no dependency, it is a root task and can immediately be evaluated
if (instance.isRoot) {
- instance.state = ProcessState.READY
+ instance.state = TaskState.READY
}
}
jobInstance.tasks = instances.values.toMutableSet()
incomingJobs += jobInstance
- broker.send(WorkflowEvent.JobSubmitted(self, job, ctx.clock.millis()))
requestCycle()
}
@@ -131,20 +122,18 @@ class StageWorkflowSchedulerLogic(
private fun requestCycle() {
when (mode) {
is WorkflowSchedulerMode.Interactive -> {
- coroutineScope.launch {
+ ctx.launch {
schedule()
}
}
is WorkflowSchedulerMode.Batch -> {
if (next == null) {
- val job = coroutineScope.launch {
+ val job = ctx.launch {
delay(mode.quantum)
+ next = null
schedule()
}
next = job
- job.invokeOnCompletion {
- next = null
- }
}
}
}
@@ -153,14 +142,15 @@ class StageWorkflowSchedulerLogic(
/**
* Perform a scheduling cycle immediately.
*/
- override suspend fun schedule() {
+ private suspend fun schedule() {
// J2 Create list of eligible jobs
jobAdmissionPolicy.startCycle(this)
val eligibleJobs = incomingJobs.filter { jobAdmissionPolicy.shouldAdmit(this, it) }
+
for (jobInstance in eligibleJobs) {
incomingJobs -= jobInstance
activeJobs += jobInstance
- brokers.getValue(jobInstance.broker).send(WorkflowEvent.JobStarted(self, jobInstance.job, ctx.clock.millis()))
+ jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis())
}
// J3 Sort jobs on criterion
@@ -177,15 +167,17 @@ class StageWorkflowSchedulerLogic(
// T3 Per task
for (instance in sortedTasks) {
- val hosts = resourceDynamicFilterPolicy(this, lease.hosts, instance)
+ val hosts = resourceDynamicFilterPolicy(this, nodes, instance)
val host = resourceSelectionPolicy.select(this, hosts, instance)
if (host != null) {
// T4 Submit task to machine
- host.ref.sendOnce(MachineMessage.Submit(instance.task.application, instance, channel.send))
- instance.host = host
- instance.state = ProcessState.RUNNING // Assume the application starts running
- machineCores.merge(host, instance.task.application.cores, Int::minus)
+ available -= host
+ instance.state = TaskState.ACTIVE
+
+ val newHost = provisioningService.deploy(host, instance.task.image, this)
+ instance.host = newHost
+ taskByServer[newHost.server!!] = instance
} else {
return
}
@@ -193,32 +185,32 @@ class StageWorkflowSchedulerLogic(
}
}
- override fun onSubmission(instance: MachineRef, application: Application, key: Any, pid: Pid) {
- val task = key as TaskView
- task.pid = pid
- taskByPid[pid] = task
-
- brokers.getValue(task.job.broker).send(WorkflowEvent.TaskStarted(self, task.job.job, task.task, ctx.clock.millis()))
- }
-
- override fun onTermination(instance: MachineRef, pid: Pid, status: Int) {
- val task = taskByPid.remove(pid) ?: throw IllegalStateException()
-
- val job = task.job
- task.state = ProcessState.TERMINATED
- job.tasks.remove(task)
- machineCores.merge(task.host!!, task.task.application.cores, Int::plus)
- brokers.getValue(job.broker).send(WorkflowEvent.TaskFinished(self, job.job, task.task, status, ctx.clock.millis()))
+ override suspend fun onUpdate(server: Server, previousState: ServerState) {
+ when (server.state) {
+ ServerState.ACTIVE -> {
+ val task = taskByServer.getValue(server)
+ task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis())
+ }
+ ServerState.SHUTOFF, ServerState.ERROR -> {
+ val task = taskByServer.remove(server) ?: throw IllegalStateException()
+ val job = task.job
+ task.state = TaskState.FINISHED
+ job.tasks.remove(task)
+ available += task.host!!
+ job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis())
+
+ if (job.isFinished) {
+ activeJobs -= job
+ job.monitor.onJobFinish(job.job, ctx.clock.millis())
+ }
- if (job.isFinished) {
- activeJobs -= job
- brokers.getValue(job.broker).send(WorkflowEvent.JobFinished(self, job.job, ctx.clock.millis()))
+ requestCycle()
+ }
+ else -> throw IllegalStateException()
}
-
- requestCycle()
}
- class JobView(val job: Job, val broker: SendRef<WorkflowEvent>) {
+ class JobView(val job: Job, val monitor: WorkflowMonitor) {
/**
* A flag to indicate whether this job is finished.
*/
@@ -245,19 +237,17 @@ class StageWorkflowSchedulerLogic(
val isRoot: Boolean
get() = dependencies.isEmpty()
- var state: ProcessState = ProcessState.CREATED
+ var state: TaskState = TaskState.CREATED
set(value) {
field = value
// Mark the process as terminated in the graph
- if (value == ProcessState.TERMINATED) {
+ if (value == TaskState.FINISHED) {
markTerminated()
}
}
- var pid: Pid? = null
-
- var host: HostView? = null
+ var host: Node? = null
/**
* Mark the specified [TaskView] as terminated.
@@ -267,7 +257,7 @@ class StageWorkflowSchedulerLogic(
dependent.dependencies.remove(this)
if (dependent.isRoot) {
- dependent.state = ProcessState.READY
+ dependent.state = TaskState.READY
}
}
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt
index 6d6d4179..ee0024f5 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt
@@ -1,7 +1,7 @@
/*
* MIT License
*
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2020 atlarge-research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -24,25 +24,12 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.ProcessContext
-import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse
-import kotlinx.coroutines.CoroutineScope
-
/**
- * A factory interface for constructing a [WorkflowSchedulerLogic].
+ * The state of a workflow task.
*/
-interface WorkflowScheduler {
- /**
- * Construct a [WorkflowSchedulerLogic] in the given [ActorContext].
- *
- * @param ctx The context in which the scheduler runs.
- * @param timers The timer scheduler to use.
- * @param lease The resource lease to use.
- */
- operator fun invoke(
- ctx: ProcessContext,
- self: WorkflowServiceRef,
- coroutineScope: CoroutineScope,
- lease: ProvisioningResponse.Lease
- ): WorkflowSchedulerLogic
+public enum class TaskState {
+ CREATED,
+ READY,
+ ACTIVE,
+ FINISHED
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
index bed6b93b..524f4f9e 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
@@ -24,161 +24,24 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.Channel
-import com.atlarge.odcsim.ProcessContext
-import com.atlarge.odcsim.SendRef
-import com.atlarge.odcsim.ask
-import com.atlarge.opendc.core.Zone
-import com.atlarge.opendc.core.ZoneMessage
-import com.atlarge.opendc.core.find
-import com.atlarge.opendc.core.resources.compute.MachineEvent
-import com.atlarge.opendc.core.services.AbstractService
-import com.atlarge.opendc.core.services.Service
-import com.atlarge.opendc.core.services.ServiceProvider
-import com.atlarge.opendc.core.services.provisioning.ProvisioningMessage
-import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse
-import com.atlarge.opendc.core.services.provisioning.ProvisioningService
+import com.atlarge.opendc.core.services.AbstractServiceKey
+import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.workload.Job
-import com.atlarge.opendc.workflows.workload.Task
import java.util.UUID
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.isActive
/**
* A service for cloud workflow management.
*
* The workflow scheduler is modelled after the Reference Architecture for Datacenter Scheduling by Andreadis et al.
*/
-class WorkflowService(val scheduler: WorkflowScheduler) : ServiceProvider {
- override val uid: UUID = UUID.randomUUID()
- override val name: String = "workflows"
- override val provides: Set<Service<*>> = setOf(WorkflowService)
- override val dependencies: Set<Service<*>> = setOf(ProvisioningService)
-
- /**
- * Build the runtime [Behavior] for the workflow service, responding to messages of shape [WorkflowMessage].
- */
- override suspend fun invoke(ctx: ProcessContext, zone: Zone, zoneRef: SendRef<ZoneMessage>, main: Channel<Any>) {
- coroutineScope {
- val inlet = ctx.listen(main.receive)
- val provisioner = zoneRef.find(ProvisioningService)
- // Wait for 0.1 sec before asking the provisioner to allow it to initialize. Will return empty response if asked
- // immediately.
- delay(10)
- val lease: ProvisioningResponse.Lease = provisioner.ask { ProvisioningMessage.Request(Int.MAX_VALUE, it) }
- val schedulerLogic = scheduler(ctx, main.send, this, lease)
-
- while (isActive) {
- when (val msg = inlet.receive()) {
- is WorkflowMessage.Submit -> {
- schedulerLogic.submit(msg.job, msg.broker)
- }
- is MachineEvent.Submitted -> {
- schedulerLogic.onSubmission(msg.instance, msg.application, msg.key, msg.pid)
- }
- is MachineEvent.Terminated -> {
- schedulerLogic.onTermination(msg.instance, msg.pid, msg.status)
- }
- }
- }
- }
- }
-
- companion object : AbstractService<WorkflowMessage>(UUID.randomUUID(), "workflows")
-}
-
-/**
- * A reference to the workflow service instance.
- */
-typealias WorkflowServiceRef = SendRef<WorkflowMessage>
-
-/**
- * A message protocol for communicating to the workflow service.
- */
-sealed class WorkflowMessage {
+public interface WorkflowService {
/**
* Submit the specified [Job] to the workflow service for scheduling.
- *
- * @property job The workflow to submit for scheduling.
- * @property broker The broker that has submitted this workflow on behalf of a user and that needs to be kept
- * up-to-date.
- */
- data class Submit(val job: Job, val broker: SendRef<WorkflowEvent>) : WorkflowMessage()
-}
-
-/**
- * A message protocol used by the workflow service to respond to [WorkflowMessage]s.
- */
-sealed class WorkflowEvent {
- /**
- * Indicate that the specified [Job] was submitted to the workflow service.
- *
- * @property service The reference to the service the job was submitted to.
- * @property job The job that has been submitted.
- * @property time A timestamp of the moment the job was received.
- */
- data class JobSubmitted(
- val service: WorkflowServiceRef,
- val job: Job,
- val time: Long
- ) : WorkflowEvent()
-
- /**
- * Indicate that the specified [Job] has become active.
- *
- * @property service The reference to the service the job was submitted to.
- * @property job The job that has been submitted.
- * @property time A timestamp of the moment the job started.
- */
- data class JobStarted(
- val service: WorkflowServiceRef,
- val job: Job,
- val time: Long
- ) : WorkflowEvent()
-
- /**
- * Indicate that the specified [Task] has started processing.
- *
- * @property service The reference to the service the job was submitted to.
- * @property job The job that contains this task.
- * @property task The task that has started processing.
- * @property time A timestamp of the moment the task started.
- */
- data class TaskStarted(
- val service: WorkflowServiceRef,
- val job: Job,
- val task: Task,
- val time: Long
- ) : WorkflowEvent()
-
- /**
- * Indicate that the specified [Task] has started processing.
- *
- * @property service The reference to the service the job was submitted to.
- * @property job The job that contains this task.
- * @property task The task that has started processing.
- * @property status The exit code of the task, where zero means successful.
- * @property time A timestamp of the moment the task finished.
*/
- data class TaskFinished(
- val service: WorkflowServiceRef,
- val job: Job,
- val task: Task,
- val status: Int,
- val time: Long
- ) : WorkflowEvent()
+ public suspend fun submit(job: Job, monitor: WorkflowMonitor)
/**
- * Indicate that the specified [Job] has finished processing.
- *
- * @property service The reference to the service the job was submitted to.
- * @property job The job that has finished processing.
- * @property time A timestamp of the moment the task finished.
+ * The service key for the workflow scheduler.
*/
- data class JobFinished(
- val service: WorkflowServiceRef,
- val job: Job,
- val time: Long
- ) : WorkflowEvent()
+ companion object Key : AbstractServiceKey<WorkflowService>(UUID.randomUUID(), "workflows")
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt
index 333ed35a..976fbbf3 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt
@@ -24,14 +24,14 @@
package com.atlarge.opendc.workflows.service.stage.job
-import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
+import com.atlarge.opendc.workflows.service.StageWorkflowService
/**
* The [FifoJobSortingPolicy] sorts tasks based on the order of arrival in the queue.
*/
class FifoJobSortingPolicy : JobSortingPolicy {
override fun invoke(
- scheduler: StageWorkflowSchedulerLogic,
- jobs: Collection<StageWorkflowSchedulerLogic.JobView>
- ): List<StageWorkflowSchedulerLogic.JobView> = jobs.toList()
+ scheduler: StageWorkflowService,
+ jobs: Collection<StageWorkflowService.JobView>
+ ): List<StageWorkflowService.JobView> = jobs.toList()
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt
index d3a5d9a6..cdaad512 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt
@@ -24,10 +24,10 @@
package com.atlarge.opendc.workflows.service.stage.job
-import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
+import com.atlarge.opendc.workflows.service.StageWorkflowService
/**
- * A policy interface for admitting [StageWorkflowSchedulerLogic.JobView]s to a scheduling cycle.
+ * A policy interface for admitting [StageWorkflowService.JobView]s to a scheduling cycle.
*/
interface JobAdmissionPolicy {
/**
@@ -35,14 +35,14 @@ interface JobAdmissionPolicy {
*
* @param scheduler The scheduler that started the cycle.
*/
- fun startCycle(scheduler: StageWorkflowSchedulerLogic) {}
+ fun startCycle(scheduler: StageWorkflowService) {}
/**
- * Determine whether the specified [StageWorkflowSchedulerLogic.JobView] should be admitted to the scheduling cycle.
+ * Determine whether the specified [StageWorkflowService.JobView] should be admitted to the scheduling cycle.
*
* @param scheduler The scheduler that should admit or reject the job.
* @param job The workflow that has been submitted.
* @return `true` if the workflow may be admitted to the scheduling cycle, `false` otherwise.
*/
- fun shouldAdmit(scheduler: StageWorkflowSchedulerLogic, job: StageWorkflowSchedulerLogic.JobView): Boolean
+ fun shouldAdmit(scheduler: StageWorkflowService, job: StageWorkflowService.JobView): Boolean
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt
index ada3e693..c3a5dab5 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.workflows.service.stage.job
-import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
+import com.atlarge.opendc.workflows.service.StageWorkflowService
/**
* A policy interface for ordering admitted workflows in the scheduling queue.
@@ -38,7 +38,7 @@ interface JobSortingPolicy {
* @return The sorted list of jobs.
*/
operator fun invoke(
- scheduler: StageWorkflowSchedulerLogic,
- jobs: Collection<StageWorkflowSchedulerLogic.JobView>
- ): List<StageWorkflowSchedulerLogic.JobView>
+ scheduler: StageWorkflowService,
+ jobs: Collection<StageWorkflowService.JobView>
+ ): List<StageWorkflowService.JobView>
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt
index f877403b..ad90839c 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.workflows.service.stage.job
-import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
+import com.atlarge.opendc.workflows.service.StageWorkflowService
/**
* A [JobAdmissionPolicy] that admits all jobs.
@@ -34,7 +34,7 @@ object NullJobAdmissionPolicy : JobAdmissionPolicy {
* Admit every submitted job.
*/
override fun shouldAdmit(
- scheduler: StageWorkflowSchedulerLogic,
- job: StageWorkflowSchedulerLogic.JobView
+ scheduler: StageWorkflowService,
+ job: StageWorkflowService.JobView
): Boolean = true
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt
index 30d5c456..9ce2811c 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.workflows.service.stage.job
-import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
+import com.atlarge.opendc.workflows.service.StageWorkflowService
import kotlin.random.Random
/**
@@ -34,7 +34,7 @@ import kotlin.random.Random
*/
class RandomJobSortingPolicy(private val random: Random = Random.Default) : JobSortingPolicy {
override fun invoke(
- scheduler: StageWorkflowSchedulerLogic,
- jobs: Collection<StageWorkflowSchedulerLogic.JobView>
- ): List<StageWorkflowSchedulerLogic.JobView> = jobs.shuffled(random)
+ scheduler: StageWorkflowService,
+ jobs: Collection<StageWorkflowService.JobView>
+ ): List<StageWorkflowService.JobView> = jobs.shuffled(random)
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt
index c3307063..e2490214 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt
@@ -24,17 +24,17 @@
package com.atlarge.opendc.workflows.service.stage.resource
-import com.atlarge.opendc.core.services.resources.HostView
-import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.workflows.service.StageWorkflowService
/**
* A [ResourceSelectionPolicy] that selects the first machine that is available.
*/
class FirstFitResourceSelectionPolicy : ResourceSelectionPolicy {
override fun select(
- scheduler: StageWorkflowSchedulerLogic,
- machines: List<HostView>,
- task: StageWorkflowSchedulerLogic.TaskView
- ): HostView? =
+ scheduler: StageWorkflowService,
+ machines: List<Node>,
+ task: StageWorkflowService.TaskView
+ ): Node? =
machines.firstOrNull()
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt
index d742f842..a8f2fda9 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt
@@ -24,8 +24,8 @@
package com.atlarge.opendc.workflows.service.stage.resource
-import com.atlarge.opendc.core.services.resources.HostView
-import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.workflows.service.StageWorkflowService
/**
* A [ResourceDynamicFilterPolicy] based on the amount of cores available on the machine and the cores required for
@@ -33,11 +33,11 @@ import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
*/
class FunctionalResourceDynamicFilterPolicy : ResourceDynamicFilterPolicy {
override fun invoke(
- scheduler: StageWorkflowSchedulerLogic,
- machines: List<HostView>,
- task: StageWorkflowSchedulerLogic.TaskView
- ): List<HostView> {
+ scheduler: StageWorkflowService,
+ machines: List<Node>,
+ task: StageWorkflowService.TaskView
+ ): List<Node> {
return machines
- .filter { scheduler.machineCores[it] ?: 0 >= task.task.application.cores }
+ .filter { it in scheduler.available }
}
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt
index 8a3b5a1e..8d8ceec2 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt
@@ -24,8 +24,8 @@
package com.atlarge.opendc.workflows.service.stage.resource
-import com.atlarge.opendc.core.services.resources.HostView
-import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.workflows.service.StageWorkflowService
/**
* This interface represents the **R4** stage of the Reference Architecture for Schedulers and acts as a filter yielding
@@ -42,8 +42,8 @@ interface ResourceDynamicFilterPolicy {
* @return The machines on which the task can be scheduled.
*/
operator fun invoke(
- scheduler: StageWorkflowSchedulerLogic,
- machines: List<HostView>,
- task: StageWorkflowSchedulerLogic.TaskView
- ): List<HostView>
+ scheduler: StageWorkflowService,
+ machines: List<Node>,
+ task: StageWorkflowService.TaskView
+ ): List<Node>
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt
index 90b2873c..38fe5886 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt
@@ -24,8 +24,8 @@
package com.atlarge.opendc.workflows.service.stage.resource
-import com.atlarge.opendc.core.services.resources.HostView
-import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.workflows.service.StageWorkflowService
/**
* This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected
@@ -41,8 +41,8 @@ interface ResourceSelectionPolicy {
* @return The selected machine or `null` if no machine could be found.
*/
fun select(
- scheduler: StageWorkflowSchedulerLogic,
- machines: List<HostView>,
- task: StageWorkflowSchedulerLogic.TaskView
- ): HostView?
+ scheduler: StageWorkflowService,
+ machines: List<Node>,
+ task: StageWorkflowService.TaskView
+ ): Node?
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt
index 48a1a50d..bba81d27 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt
@@ -24,14 +24,14 @@
package com.atlarge.opendc.workflows.service.stage.task
-import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
+import com.atlarge.opendc.workflows.service.StageWorkflowService
/**
* The [FifoTaskSortingPolicy] sorts tasks based on the order of arrival in the queue.
*/
class FifoTaskSortingPolicy : TaskSortingPolicy {
override fun invoke(
- scheduler: StageWorkflowSchedulerLogic,
- tasks: Collection<StageWorkflowSchedulerLogic.TaskView>
- ): List<StageWorkflowSchedulerLogic.TaskView> = tasks.toList()
+ scheduler: StageWorkflowService,
+ tasks: Collection<StageWorkflowService.TaskView>
+ ): List<StageWorkflowService.TaskView> = tasks.toList()
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt
index 1672633e..72ecbee2 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt
@@ -24,15 +24,15 @@
package com.atlarge.opendc.workflows.service.stage.task
-import com.atlarge.opendc.core.resources.compute.scheduling.ProcessState
-import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
/**
* A [TaskEligibilityPolicy] that marks tasks as eligible if they are tasks roots within the job.
*/
class FunctionalTaskEligibilityPolicy : TaskEligibilityPolicy {
override fun isEligible(
- scheduler: StageWorkflowSchedulerLogic,
- task: StageWorkflowSchedulerLogic.TaskView
- ): Boolean = task.state == ProcessState.READY
+ scheduler: StageWorkflowService,
+ task: StageWorkflowService.TaskView
+ ): Boolean = task.state == TaskState.READY
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt
index 36ef3a50..1b1d5b44 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.workflows.service.stage.task
-import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
+import com.atlarge.opendc.workflows.service.StageWorkflowService
import kotlin.random.Random
/**
@@ -34,7 +34,7 @@ import kotlin.random.Random
*/
class RandomTaskSortingPolicy(private val random: Random = Random.Default) : TaskSortingPolicy {
override fun invoke(
- scheduler: StageWorkflowSchedulerLogic,
- tasks: Collection<StageWorkflowSchedulerLogic.TaskView>
- ): List<StageWorkflowSchedulerLogic.TaskView> = tasks.shuffled(random)
+ scheduler: StageWorkflowService,
+ tasks: Collection<StageWorkflowService.TaskView>
+ ): List<StageWorkflowService.TaskView> = tasks.shuffled(random)
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt
index 19f0240b..19954d7b 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.workflows.service.stage.task
-import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
+import com.atlarge.opendc.workflows.service.StageWorkflowService
/**
* A policy interface for determining the eligibility of tasks in a scheduling cycle.
@@ -35,14 +35,14 @@ interface TaskEligibilityPolicy {
*
* @param scheduler The scheduler that started the cycle.
*/
- fun startCycle(scheduler: StageWorkflowSchedulerLogic) {}
+ fun startCycle(scheduler: StageWorkflowService) {}
/**
- * Determine whether the specified [StageWorkflowSchedulerLogic.TaskView] is eligible to be scheduled.
+ * Determine whether the specified [StageWorkflowService.TaskView] is eligible to be scheduled.
*
* @param scheduler The scheduler that is determining whether the task is eligible.
* @param task The task instance to schedule.
* @return `true` if the task eligible to be scheduled, `false` otherwise.
*/
- fun isEligible(scheduler: StageWorkflowSchedulerLogic, task: StageWorkflowSchedulerLogic.TaskView): Boolean
+ fun isEligible(scheduler: StageWorkflowService, task: StageWorkflowService.TaskView): Boolean
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt
index 6a65ed69..aabc44a9 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.workflows.service.stage.task
-import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic
+import com.atlarge.opendc.workflows.service.StageWorkflowService
/**
* This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the
@@ -39,7 +39,7 @@ interface TaskSortingPolicy {
* @return The sorted list of tasks.
*/
operator fun invoke(
- scheduler: StageWorkflowSchedulerLogic,
- tasks: Collection<StageWorkflowSchedulerLogic.TaskView>
- ): List<StageWorkflowSchedulerLogic.TaskView>
+ scheduler: StageWorkflowService,
+ tasks: Collection<StageWorkflowService.TaskView>
+ ): List<StageWorkflowService.TaskView>
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt
index 25fe7348..b5997b35 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt
@@ -24,8 +24,8 @@
package com.atlarge.opendc.workflows.workload
+import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.Identity
-import com.atlarge.opendc.core.workload.application.Application
import java.util.UUID
/**
@@ -33,13 +33,13 @@ import java.util.UUID
*
* @property uid A unique identified of this task.
* @property name The name of this task.
- * @property application The application to run as part of this workflow task.
+ * @property image The application image to run as part of this workflow task.
* @property dependencies The dependencies of this task in order for it to execute.
*/
data class Task(
override val uid: UUID,
override val name: String,
- val application: Application,
+ val image: Image,
val dependencies: Set<Task>
) : Identity {
override fun equals(other: Any?): Boolean = other is Task && uid == other.uid