diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-01-07 17:25:40 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-01-07 23:33:57 +0100 |
| commit | 9cf24c9a8d3e96a29d9b111081bc3369aadd490d (patch) | |
| tree | 4f378ee9f77d8623a67a403135a4010afd5f9000 /simulator/opendc-workflows/src/main/kotlin/org | |
| parent | 74a4bff83bfb6366cc193d1fc9c4a07e49649649 (diff) | |
Refactor workflow service to schedule tasks onto VMs
This change updates the workflow service to delegate the resource
scheduling logic to the virtualized resource provisioner.
Diffstat (limited to 'simulator/opendc-workflows/src/main/kotlin/org')
8 files changed, 31 insertions, 249 deletions
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt index 91657f83..2c8d9a0b 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt @@ -29,41 +29,43 @@ import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import mu.KotlinLogging +import org.opendc.compute.core.Flavor import org.opendc.compute.core.Server import org.opendc.compute.core.ServerEvent import org.opendc.compute.core.ServerState -import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.metal.service.ProvisioningService +import org.opendc.compute.core.virt.service.VirtProvisioningService import org.opendc.trace.core.EventTracer import org.opendc.trace.core.consumeAsFlow import org.opendc.trace.core.enable import org.opendc.workflows.service.stage.job.JobAdmissionPolicy import org.opendc.workflows.service.stage.job.JobOrderPolicy -import org.opendc.workflows.service.stage.resource.ResourceFilterPolicy -import org.opendc.workflows.service.stage.resource.ResourceSelectionPolicy import org.opendc.workflows.service.stage.task.TaskEligibilityPolicy import org.opendc.workflows.service.stage.task.TaskOrderPolicy import org.opendc.workflows.workload.Job +import org.opendc.workflows.workload.WORKFLOW_TASK_CORES import java.time.Clock import java.util.* /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for - * Topology Scheduling. + * Datacenter Scheduling. */ public class StageWorkflowService( internal val coroutineScope: CoroutineScope, internal val clock: Clock, internal val tracer: EventTracer, - private val provisioningService: ProvisioningService, + private val provisioningService: VirtProvisioningService, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, - taskOrderPolicy: TaskOrderPolicy, - resourceFilterPolicy: ResourceFilterPolicy, - resourceSelectionPolicy: ResourceSelectionPolicy + taskOrderPolicy: TaskOrderPolicy ) : WorkflowService { + /** + * The logger instance to use. + */ + private val logger = KotlinLogging.logger {} /** * The incoming jobs ready to be processed by the scheduler. @@ -101,25 +103,10 @@ public class StageWorkflowService( internal val taskByServer = mutableMapOf<Server, TaskState>() /** - * The nodes that are controlled by the service. - */ - internal lateinit var nodes: List<Node> - - /** - * The available nodes. - */ - internal val available: MutableSet<Node> = mutableSetOf() - - /** - * The maximum number of incoming jobs. - */ - private val throttleLimit: Int = 20000 - - /** * The load of the system. */ internal val load: Double - get() = (available.size / nodes.size.toDouble()) + get() = (activeTasks.size / provisioningService.hostCount.toDouble()) /** * The root listener of this scheduler. @@ -170,22 +157,13 @@ public class StageWorkflowService( private val mode: WorkflowSchedulerMode.Logic private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic - private val resourceFilterPolicy: ResourceFilterPolicy.Logic - private val resourceSelectionPolicy: Comparator<Node> init { - coroutineScope.launch { - nodes = provisioningService.nodes().toList() - available.addAll(nodes) - } - this.mode = mode(this) this.jobAdmissionPolicy = jobAdmissionPolicy(this) this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid }) this.taskEligibilityPolicy = taskEligibilityPolicy(this) this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) - this.resourceFilterPolicy = resourceFilterPolicy(this) - this.resourceSelectionPolicy = resourceSelectionPolicy(this) } override val events: Flow<WorkflowEvent> = tracer.openRecording().let { @@ -290,26 +268,25 @@ public class StageWorkflowService( // T3 Per task while (taskQueue.isNotEmpty()) { val instance = taskQueue.peek() - val host: Node? = available.firstOrNull() - if (host != null) { - // T4 Submit task to machine - available -= host + val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1 + val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task + val image = instance.task.image + coroutineScope.launch { + val server = provisioningService.deploy(instance.task.name, image, flavor) + instance.state = TaskStatus.ACTIVE - val newHost = provisioningService.deploy(host, instance.task.image) - val server = newHost.server!! - instance.host = newHost + instance.server = server taskByServer[server] = instance + server.events .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } .launchIn(coroutineScope) - - activeTasks += instance - taskQueue.poll() - rootListener.taskAssigned(instance) - } else { - break } + + activeTasks += instance + taskQueue.poll() + rootListener.taskAssigned(instance) } } @@ -333,7 +310,6 @@ public class StageWorkflowService( task.state = TaskStatus.FINISHED task.finishedAt = clock.millis() job.tasks.remove(task) - available += task.host!! activeTasks -= task tracer.commit( WorkflowEvent.TaskFinished( diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt index ed023c82..d1eb6704 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt @@ -22,7 +22,7 @@ package org.opendc.workflows.service -import org.opendc.compute.core.metal.Node +import org.opendc.compute.core.Server import org.opendc.workflows.workload.Task public class TaskState(public val job: JobState, public val task: Task) { @@ -62,7 +62,7 @@ public class TaskState(public val job: JobState, public val task: Task) { } } - public var host: Node? = null + public var server: Server? = null /** * Mark the specified [TaskView] as terminated. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt deleted file mode 100644 index 8dc323ec..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.StageWorkflowService - -/** - * A [ResourceSelectionPolicy] that selects the first machine that is available. - */ -public object FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<Node> = - Comparator<Node> { _, _ -> 1 } - - override fun toString(): String = "First-Fit" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt deleted file mode 100644 index ac79a9ce..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState - -/** - * A [ResourceFilterPolicy] based on the amount of cores available on the machine and the cores required for - * the task. - */ -public object FunctionalResourceFilterPolicy : ResourceFilterPolicy { - override fun invoke(scheduler: StageWorkflowService): ResourceFilterPolicy.Logic = - object : ResourceFilterPolicy.Logic { - override fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> = - hosts.filter { it in scheduler.available } - } - - override fun toString(): String = "functional" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt deleted file mode 100644 index caf87c70..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.StageWorkflowService -import java.util.* - -/** - * A [ResourceSelectionPolicy] that randomly orders the machines. - */ -public object RandomResourceSelectionPolicy : ResourceSelectionPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<Node> = object : Comparator<Node> { - private val ids: Map<Node, Long> - - init { - val random = Random(123) - ids = scheduler.nodes.associateWith { random.nextLong() } - } - - override fun compare(o1: Node, o2: Node): Int = compareValuesBy(o1, o2) { ids[it] } - } - - override fun toString(): String = "Random" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt deleted file mode 100644 index 4923a34b..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.service.stage.StagePolicy - -/** - * This interface represents stages **R2**, **R3** and **R4** stage of the Reference Architecture for Schedulers and - * acts as a filter yielding a list of resources with sufficient resource-capacities, based on fixed or dynamic - * requirements, and on predicted or monitored information about processing unit availability, memory occupancy, etc. - */ -public interface ResourceFilterPolicy : StagePolicy<ResourceFilterPolicy.Logic> { - public interface Logic { - /** - * Filter the list of machines based on dynamic information. - * - * @param hosts The hosts to filter. - * @param task The task that is to be scheduled. - * @return The machines on which the task can be scheduled. - */ - public operator fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> - } -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt deleted file mode 100644 index 990b990a..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.stage.StagePolicy - -/** - * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected - * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit. - */ -public interface ResourceSelectionPolicy : StagePolicy<Comparator<Node>> diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt index d02e2b4e..4305aa57 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt @@ -28,3 +28,8 @@ package org.opendc.workflows.workload * Meta-data key for the deadline of a task. */ public const val WORKFLOW_TASK_DEADLINE: String = "workflow:task:deadline" + +/** + * Meta-data key for the number of cores needed for a task. + */ +public const val WORKFLOW_TASK_CORES: String = "workflow:task:cores" |
