diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2017-09-20 00:37:13 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2017-09-20 00:37:13 +0200 |
| commit | 90e0d1d8c6ab94e020dd4cb4831a369b270a69b7 (patch) | |
| tree | ab456b9fc10b52992fc8cf48fc2f63fee127dbb8 | |
| parent | a67b87be9e14d6d3c23e1e6aff5051176171e6ef (diff) | |
Implement standard task schedulers
This change implements default tasks scheduling algorithms like FIFO or SRTF
and adds them to to the standard library.
10 files changed, 257 insertions, 84 deletions
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/experiment/Experiment.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/experiment/Experiment.kt deleted file mode 100644 index a4f947c8..00000000 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/experiment/Experiment.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2017 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package nl.atlarge.opendc.experiment - -/** - * A simulation of multiple simultaneous workloads running on top of a topology. - * - * @param id The unique identifier of the experiment. - * @param name The name of the experiment. - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) - */ -data class Experiment(val id: Int, val name: String) diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/experiment/ExperimentRunner.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/experiment/ExperimentRunner.kt deleted file mode 100644 index e53c6e08..00000000 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/experiment/ExperimentRunner.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2017 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package nl.atlarge.opendc.experiment - -/** - * An [ExperimentRunner] is responsible for executing an [Experiment]. - * - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) - */ -interface ExperimentRunner { - /** - * Run the given [Experiment] using this runner. - * - * @param experiment The experiment to run. - */ - fun run(experiment: Experiment) -} diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt new file mode 100644 index 00000000..cc196a00 --- /dev/null +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt @@ -0,0 +1,101 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.scheduler + +import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.topology.Entity +import nl.atlarge.opendc.topology.machine.Machine +import nl.atlarge.opendc.workload.Task +import java.util.* + +/** + * A [Scheduler] that distributes work according to the first-in-first-out principle. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +class FifoScheduler : Scheduler { + /** + * The set of machines the scheduler knows of. + */ + val machines: MutableSet<Machine> = HashSet() + + /** + * The queue of [Task]s that need to be scheduled. + */ + val queue: Queue<Task> = ArrayDeque() + + /** + * (Re)schedule the tasks submitted to the scheduler over the specified set of machines. + */ + override suspend fun <E : Entity<*>> Context<E>.schedule() { + if (queue.isEmpty()) { + return + } + + machines + .filter { it.state.status == Machine.Status.IDLE } + .forEach { + while (queue.isNotEmpty()) { + val task = queue.poll() + + // TODO What to do with tasks that are not ready yet to be processed + if (!task.isReady()) { + submit(task) + continue + } + + it.send(task) + break + } + } + } + + /** + * Submit a [Task] to this scheduler. + * + * @param task The task to submit to the scheduler. + */ + override fun submit(task: Task) { + queue.add(task) + } + + /** + * Register a [Machine] to this scheduler. + * + * @param machine The machine to register. + */ + override fun register(machine: Machine) { + machines.add(machine) + } + + /** + * Deregister a [Machine] from this scheduler. + * + * @param machine The machine to deregister. + */ + override fun deregister(machine: Machine) { + machines.remove(machine) + } +} diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/experiment/Scheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/Scheduler.kt index 28aa84e7..8c96341a 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/experiment/Scheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/Scheduler.kt @@ -22,21 +22,45 @@ * SOFTWARE. */ -package nl.atlarge.opendc.experiment +package nl.atlarge.opendc.scheduler +import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.topology.Entity +import nl.atlarge.opendc.topology.machine.Machine +import nl.atlarge.opendc.workload.Task /** * A task scheduler that is coupled to an [Entity] in the topology of the cloud network. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -interface Scheduler<in E : Entity<*>> { +interface Scheduler { /** - * Schedule the given jobs for the given entity. + * (Re)schedule the tasks submitted to the scheduler over the specified set of machines. * - * @param entity The entity in the cloud network topology representing the entity. - * @param jobs The jobs that have been submitted to the cloud network. + * This method should be invoked at some interval to allow the scheduler to reschedule existing tasks and schedule + * new tasks. */ - fun schedule(entity: E, jobs: Set<Job>) + suspend fun <E: Entity<*>> Context<E>.schedule() + + /** + * Submit a [Task] to this scheduler. + * + * @param task The task to submit to the scheduler. + */ + fun submit(task: Task) + + /** + * Register a [Machine] to this scheduler. + * + * @param machine The machine to register. + */ + fun register(machine: Machine) + + /** + * Deregister a [Machine] from this scheduler. + * + * @param machine The machine to deregister. + */ + fun deregister(machine: Machine) } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt new file mode 100644 index 00000000..ce80ddc3 --- /dev/null +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt @@ -0,0 +1,106 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.scheduler + +import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.topology.Entity +import nl.atlarge.opendc.topology.machine.Machine +import nl.atlarge.opendc.workload.Task +import java.util.* + +/** + * A [Scheduler] that distributes work according to the shortest job first policy. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +class SrtfScheduler : Scheduler { + /** + * The set of machines the scheduler knows of. + */ + val machines: MutableSet<Machine> = HashSet() + + /** + * The set of [Task]s that need to be scheduled. + */ + val tasks: MutableSet<Task> = HashSet() + + /** + * (Re)schedule the tasks submitted to the scheduler over the specified set of machines. + */ + override suspend fun <E : Entity<*>> Context<E>.schedule() { + if (tasks.isEmpty()) { + return + } + + val iterator = tasks.sortedBy { it.remaining }.iterator() + + machines + .filter { it.state.status == Machine.Status.IDLE } + .forEach { + while (iterator.hasNext()) { + val task = iterator.next() + + // TODO What to do with tasks that are not ready yet to be processed + if (!task.isReady()) { + submit(task) + continue + } else if (task.finished) { + tasks.remove(task) + continue + } + + it.send(task) + break + } + } + } + + /** + * Submit a [Task] to this scheduler. + * + * @param task The task to submit to the scheduler. + */ + override fun submit(task: Task) { + tasks.add(task) + } + + /** + * Register a [Machine] to this scheduler. + * + * @param machine The machine to register. + */ + override fun register(machine: Machine) { + machines.add(machine) + } + + /** + * Deregister a [Machine] from this scheduler. + * + * @param machine The machine to deregister. + */ + override fun deregister(machine: Machine) { + machines.remove(machine) + } +} diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt index b995732a..4136c03c 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt @@ -32,5 +32,8 @@ import nl.atlarge.opendc.topology.Entity * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ class Datacenter : Entity<Unit> { + /** + * The initial state of the entity. + */ override val initialState = Unit } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Room.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Room.kt index 3b338899..109a5629 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Room.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Room.kt @@ -31,4 +31,9 @@ import nl.atlarge.opendc.topology.Entity * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -interface Room : Entity<Unit> +class Room : Entity<Unit> { + /** + * The initial state of the entity. + */ + override val initialState = Unit +} diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/experiment/Job.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Job.kt index 8c41735d..6dbbf327 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/experiment/Job.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Job.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package nl.atlarge.opendc.experiment +package nl.atlarge.opendc.workload /** * A job that is submitted to a cloud network, which consists of one or multiple [Task]s. diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/experiment/Task.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt index ec2eb2fa..9a8c84e6 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/experiment/Task.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package nl.atlarge.opendc.experiment +package nl.atlarge.opendc.workload /** * A task represents some computation that is part of a [Job]. @@ -47,6 +47,13 @@ data class Task( private set /** + * Determine whether the task is ready to be processed. + * + * @return `true` if the task is ready to be processed, `false` otherwise. + */ + fun isReady() = dependencies.all { it.finished } + + /** * Consume the given amount of flops of this task. * * @param flops The total amount of flops to consume. diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/experiment/User.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/User.kt index bb87a167..d0e90851 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/experiment/User.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/User.kt @@ -22,14 +22,14 @@ * SOFTWARE. */ -package nl.atlarge.opendc.experiment +package nl.atlarge.opendc.workload import nl.atlarge.opendc.topology.Topology /** * A user of a cloud network that provides [Job]s for the simulation. * - * <p>Each [User] in a simulation has its own logical view of the cloud network which is used to route its jobs in the + * Each [User] in a simulation has its own logical view of the cloud network which is used to route its jobs in the * physical network. * * @param id The unique identifier of the user. |
