diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2017-10-04 10:44:24 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2017-10-04 10:47:08 +0200 |
| commit | 25cc35b0e4942e990c01ac6224720e8fe84fd9ae (patch) | |
| tree | 12cd345922388df70ac4498570659d6c6990a7e5 /opendc-stdlib | |
| parent | 2c26e9c91c945af065770c323e8b80a9f5104379 (diff) | |
bug(#9): Fix interference between experiments
This change fixes the interference of multiple experiments running at
the same time due to some thread unsafe behaviour in the
JpaExperimentManager class.
The code has now been restructured to solve the issue and fix the thread
unsafe behaviour.
Closes #9.
Diffstat (limited to 'opendc-stdlib')
8 files changed, 100 insertions, 41 deletions
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/FifoScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/FifoScheduler.kt index 1c3dc869..c45ed5e6 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/FifoScheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/FifoScheduler.kt @@ -25,6 +25,7 @@ package nl.atlarge.opendc.platform.scheduler import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.platform.workload.Job import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine import nl.atlarge.opendc.platform.workload.Task @@ -69,7 +70,7 @@ class FifoScheduler : Scheduler { val task = iterator.next() // TODO What to do with tasks that are not ready yet to be processed - if (!task.isReady()) { + if (!task.ready) { iterator.remove() rescheduled.add(task) continue @@ -85,7 +86,7 @@ class FifoScheduler : Scheduler { // Reschedule all tasks that are not ready yet while (!rescheduled.isEmpty()) { - submit(rescheduled.poll()) + queue.add(rescheduled.poll()) } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/Scheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/Scheduler.kt index bf988802..578bef9c 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/Scheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/Scheduler.kt @@ -25,9 +25,9 @@ package nl.atlarge.opendc.platform.scheduler import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.platform.workload.Task import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine -import nl.atlarge.opendc.platform.workload.Task /** * A task scheduler that is coupled to an [Entity] in the topology of the cloud network. diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/SrtfScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/SrtfScheduler.kt index b2660964..03f37b50 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/SrtfScheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/SrtfScheduler.kt @@ -25,9 +25,9 @@ package nl.atlarge.opendc.platform.scheduler import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.platform.workload.Task import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine -import nl.atlarge.opendc.platform.workload.Task import java.util.* /** @@ -67,8 +67,8 @@ class SrtfScheduler : Scheduler { val task = iterator.next() // TODO What to do with tasks that are not ready yet to be processed - if (!task.isReady()) { - submit(task) + if (!task.ready) { + tasks.add(task) continue } else if (task.finished) { tasks.remove(task) diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Job.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Job.kt index eb173229..df2f2b6a 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Job.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Job.kt @@ -25,7 +25,7 @@ package nl.atlarge.opendc.platform.workload /** - * A job that is submitted to a cloud network, which consists of one or multiple [Task]s. + * A bag of tasks which are submitted by a [User] to the cloud network. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Task.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Task.kt index 140a5d5d..e740cdd8 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Task.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Task.kt @@ -24,12 +24,11 @@ package nl.atlarge.opendc.platform.workload -import nl.atlarge.opendc.kernel.Context -import nl.atlarge.opendc.topology.container.Datacenter +import nl.atlarge.opendc.kernel.time.Instant import nl.atlarge.opendc.topology.machine.Machine /** - * A task represents some computation that is part of a [Job]. + * A task that runs as part of a [Job] on a [Machine]. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ @@ -55,51 +54,39 @@ interface Task { val parallelizable: Boolean /** - * The remaining amount of flops to compute. + * The remaining flops for this task. */ val remaining: Long /** - * A flag to indicate the task has been accepted by the datacenter. + * The state of the task. */ - val accepted: Boolean + val state: TaskState /** - * A flag to indicate the task has been started. + * A flag to indicate whether the task is ready to be started. */ - val started: Boolean + val ready: Boolean + get() = !dependencies.any { !it.finished } /** - * A flag to indicate whether the task is finished. + * A flag to indicate whether the task has finished. */ val finished: Boolean + get() = state is TaskState.Finished /** - * Determine whether the task is ready to be processed. + * This method is invoked when a task has arrived at a datacenter. * - * @return `true` if the task is ready to be processed, `false` otherwise. + * @param time The moment in time the task has arrived at the datacenter. */ - fun isReady() = dependencies.all { it.finished } - - /** - * Accept the task into the scheduling queue. - */ - fun Context<Datacenter>.accept() - - /** - * Start a task. - */ - fun Context<Machine>.start() + fun arrive(time: Instant) /** * Consume the given amount of flops of this task. * + * @param time The current moment in time of the consumption. * @param flops The total amount of flops to consume. */ - fun Context<Machine>.consume(flops: Long) - - /** - * Finalise the task. - */ - fun Context<Machine>.finalize() + fun consume(time: Instant, flops: Long) } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/TaskState.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/TaskState.kt new file mode 100644 index 00000000..d1f908af --- /dev/null +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/TaskState.kt @@ -0,0 +1,72 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.platform.workload + +import nl.atlarge.opendc.kernel.time.Instant + + +/** + * This class hierarchy describes the states of a [Task]. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +sealed class TaskState { + /** + * A state to indicate the task has not yet arrived at the [Datacenter]. + */ + object Underway : TaskState() + + /** + * A state to indicate the task has arrived at the [Datacenter]. + * + * @property at The moment in time the task has arrived. + */ + data class Queued(val at: Instant) : TaskState() + + /** + * A state to indicate the task has started running on a machine. + * + * @property previous The previous state of the task. + * @property at The moment in time the task started. + */ + data class Running(val previous: Queued, val at: Instant) : TaskState() + + /** + * A state to indicate the task has finished. + * + * @property previous The previous state of the task. + * @property at The moment in time the task finished. + */ + data class Finished(val previous: Running, val at: Instant) : TaskState() + + /** + * A state to indicate the task has failed. + * + * @property previous The previous state of the task. + * @property at The moment in time the task failed. + * @property reason The reason of the failure. + */ + data class Failed(val previous: Running, val at: Instant, val reason: String) : TaskState() +} diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt index 333609bb..b3d67568 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt @@ -30,9 +30,10 @@ import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.kernel.Process import nl.atlarge.opendc.kernel.time.Duration import nl.atlarge.opendc.platform.scheduler.Scheduler +import nl.atlarge.opendc.platform.workload.Job +import nl.atlarge.opendc.platform.workload.Task import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine -import nl.atlarge.opendc.platform.workload.Task import java.util.* /** @@ -88,6 +89,7 @@ interface Datacenter : Entity<Unit>, Process<Datacenter> { while (queue.isNotEmpty()) { val msg = queue.poll() if (msg is Task) { + msg.arrive(time) scheduler.submit(msg) } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt index b1d881a1..761f14b1 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt @@ -83,16 +83,14 @@ open class Machine : Entity<Machine.State>, Process<Machine> { var task: Task = receiveTask() update(State(Status.RUNNING, task, load = 1.0, memory = state.memory + 50, temperature = 30.0)) - task.run { start() } while (true) { - if (task.remaining <= 0) { - task.run { finalize() } + if (task.finished) { logger.info { "${entity.id}: Task ${task.id} finished. Machine idle at $time" } update(State(Status.IDLE)) task = receiveTask() } else { - task.run { consume(speed * delta) } + task.consume(time, speed * delta) } // Check if we have received a new order in the meantime. @@ -100,7 +98,6 @@ open class Machine : Entity<Machine.State>, Process<Machine> { if (msg is Task) { task = msg update(State(Status.RUNNING, task, load = 1.0, memory = state.memory + 50, temperature = 30.0)) - task.run { start() } } } } |
