summaryrefslogtreecommitdiff
path: root/simulator/opendc-workflows/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-workflows/src/main')
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt72
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt4
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt36
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt41
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt45
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt45
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt32
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt5
8 files changed, 31 insertions, 249 deletions
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
index 91657f83..2c8d9a0b 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
@@ -29,41 +29,43 @@ import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
+import mu.KotlinLogging
+import org.opendc.compute.core.Flavor
import org.opendc.compute.core.Server
import org.opendc.compute.core.ServerEvent
import org.opendc.compute.core.ServerState
-import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.metal.service.ProvisioningService
+import org.opendc.compute.core.virt.service.VirtProvisioningService
import org.opendc.trace.core.EventTracer
import org.opendc.trace.core.consumeAsFlow
import org.opendc.trace.core.enable
import org.opendc.workflows.service.stage.job.JobAdmissionPolicy
import org.opendc.workflows.service.stage.job.JobOrderPolicy
-import org.opendc.workflows.service.stage.resource.ResourceFilterPolicy
-import org.opendc.workflows.service.stage.resource.ResourceSelectionPolicy
import org.opendc.workflows.service.stage.task.TaskEligibilityPolicy
import org.opendc.workflows.service.stage.task.TaskOrderPolicy
import org.opendc.workflows.workload.Job
+import org.opendc.workflows.workload.WORKFLOW_TASK_CORES
import java.time.Clock
import java.util.*
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
- * Topology Scheduling.
+ * Datacenter Scheduling.
*/
public class StageWorkflowService(
internal val coroutineScope: CoroutineScope,
internal val clock: Clock,
internal val tracer: EventTracer,
- private val provisioningService: ProvisioningService,
+ private val provisioningService: VirtProvisioningService,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
- taskOrderPolicy: TaskOrderPolicy,
- resourceFilterPolicy: ResourceFilterPolicy,
- resourceSelectionPolicy: ResourceSelectionPolicy
+ taskOrderPolicy: TaskOrderPolicy
) : WorkflowService {
+ /**
+ * The logger instance to use.
+ */
+ private val logger = KotlinLogging.logger {}
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -101,25 +103,10 @@ public class StageWorkflowService(
internal val taskByServer = mutableMapOf<Server, TaskState>()
/**
- * The nodes that are controlled by the service.
- */
- internal lateinit var nodes: List<Node>
-
- /**
- * The available nodes.
- */
- internal val available: MutableSet<Node> = mutableSetOf()
-
- /**
- * The maximum number of incoming jobs.
- */
- private val throttleLimit: Int = 20000
-
- /**
* The load of the system.
*/
internal val load: Double
- get() = (available.size / nodes.size.toDouble())
+ get() = (activeTasks.size / provisioningService.hostCount.toDouble())
/**
* The root listener of this scheduler.
@@ -170,22 +157,13 @@ public class StageWorkflowService(
private val mode: WorkflowSchedulerMode.Logic
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
- private val resourceFilterPolicy: ResourceFilterPolicy.Logic
- private val resourceSelectionPolicy: Comparator<Node>
init {
- coroutineScope.launch {
- nodes = provisioningService.nodes().toList()
- available.addAll(nodes)
- }
-
this.mode = mode(this)
this.jobAdmissionPolicy = jobAdmissionPolicy(this)
this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid })
this.taskEligibilityPolicy = taskEligibilityPolicy(this)
this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid })
- this.resourceFilterPolicy = resourceFilterPolicy(this)
- this.resourceSelectionPolicy = resourceSelectionPolicy(this)
}
override val events: Flow<WorkflowEvent> = tracer.openRecording().let {
@@ -290,26 +268,25 @@ public class StageWorkflowService(
// T3 Per task
while (taskQueue.isNotEmpty()) {
val instance = taskQueue.peek()
- val host: Node? = available.firstOrNull()
- if (host != null) {
- // T4 Submit task to machine
- available -= host
+ val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1
+ val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task
+ val image = instance.task.image
+ coroutineScope.launch {
+ val server = provisioningService.deploy(instance.task.name, image, flavor)
+
instance.state = TaskStatus.ACTIVE
- val newHost = provisioningService.deploy(host, instance.task.image)
- val server = newHost.server!!
- instance.host = newHost
+ instance.server = server
taskByServer[server] = instance
+
server.events
.onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
.launchIn(coroutineScope)
-
- activeTasks += instance
- taskQueue.poll()
- rootListener.taskAssigned(instance)
- } else {
- break
}
+
+ activeTasks += instance
+ taskQueue.poll()
+ rootListener.taskAssigned(instance)
}
}
@@ -333,7 +310,6 @@ public class StageWorkflowService(
task.state = TaskStatus.FINISHED
task.finishedAt = clock.millis()
job.tasks.remove(task)
- available += task.host!!
activeTasks -= task
tracer.commit(
WorkflowEvent.TaskFinished(
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
index ed023c82..d1eb6704 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
@@ -22,7 +22,7 @@
package org.opendc.workflows.service
-import org.opendc.compute.core.metal.Node
+import org.opendc.compute.core.Server
import org.opendc.workflows.workload.Task
public class TaskState(public val job: JobState, public val task: Task) {
@@ -62,7 +62,7 @@ public class TaskState(public val job: JobState, public val task: Task) {
}
}
- public var host: Node? = null
+ public var server: Server? = null
/**
* Mark the specified [TaskView] as terminated.
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt
deleted file mode 100644
index 8dc323ec..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.StageWorkflowService
-
-/**
- * A [ResourceSelectionPolicy] that selects the first machine that is available.
- */
-public object FirstFitResourceSelectionPolicy : ResourceSelectionPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<Node> =
- Comparator<Node> { _, _ -> 1 }
-
- override fun toString(): String = "First-Fit"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt
deleted file mode 100644
index ac79a9ce..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
-
-/**
- * A [ResourceFilterPolicy] based on the amount of cores available on the machine and the cores required for
- * the task.
- */
-public object FunctionalResourceFilterPolicy : ResourceFilterPolicy {
- override fun invoke(scheduler: StageWorkflowService): ResourceFilterPolicy.Logic =
- object : ResourceFilterPolicy.Logic {
- override fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> =
- hosts.filter { it in scheduler.available }
- }
-
- override fun toString(): String = "functional"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt
deleted file mode 100644
index caf87c70..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.StageWorkflowService
-import java.util.*
-
-/**
- * A [ResourceSelectionPolicy] that randomly orders the machines.
- */
-public object RandomResourceSelectionPolicy : ResourceSelectionPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<Node> = object : Comparator<Node> {
- private val ids: Map<Node, Long>
-
- init {
- val random = Random(123)
- ids = scheduler.nodes.associateWith { random.nextLong() }
- }
-
- override fun compare(o1: Node, o2: Node): Int = compareValuesBy(o1, o2) { ids[it] }
- }
-
- override fun toString(): String = "Random"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt
deleted file mode 100644
index 4923a34b..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.TaskState
-import org.opendc.workflows.service.stage.StagePolicy
-
-/**
- * This interface represents stages **R2**, **R3** and **R4** stage of the Reference Architecture for Schedulers and
- * acts as a filter yielding a list of resources with sufficient resource-capacities, based on fixed or dynamic
- * requirements, and on predicted or monitored information about processing unit availability, memory occupancy, etc.
- */
-public interface ResourceFilterPolicy : StagePolicy<ResourceFilterPolicy.Logic> {
- public interface Logic {
- /**
- * Filter the list of machines based on dynamic information.
- *
- * @param hosts The hosts to filter.
- * @param task The task that is to be scheduled.
- * @return The machines on which the task can be scheduled.
- */
- public operator fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node>
- }
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt
deleted file mode 100644
index 990b990a..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.stage.StagePolicy
-
-/**
- * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected
- * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit.
- */
-public interface ResourceSelectionPolicy : StagePolicy<Comparator<Node>>
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt
index d02e2b4e..4305aa57 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt
@@ -28,3 +28,8 @@ package org.opendc.workflows.workload
* Meta-data key for the deadline of a task.
*/
public const val WORKFLOW_TASK_DEADLINE: String = "workflow:task:deadline"
+
+/**
+ * Meta-data key for the number of cores needed for a task.
+ */
+public const val WORKFLOW_TASK_CORES: String = "workflow:task:cores"