diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2017-10-04 10:44:24 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2017-10-04 10:47:08 +0200 |
| commit | 25cc35b0e4942e990c01ac6224720e8fe84fd9ae (patch) | |
| tree | 12cd345922388df70ac4498570659d6c6990a7e5 | |
| parent | 2c26e9c91c945af065770c323e8b80a9f5104379 (diff) | |
bug(#9): Fix interference between experiments
This change fixes the interference of multiple experiments running at
the same time due to some thread unsafe behaviour in the
JpaExperimentManager class.
The code has now been restructured to solve the issue and fix the thread
unsafe behaviour.
Closes #9.
14 files changed, 284 insertions, 173 deletions
diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/Jpa.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/Jpa.kt new file mode 100644 index 00000000..cbbe280a --- /dev/null +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/Jpa.kt @@ -0,0 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.integration.jpa + +import javax.persistence.EntityManager + +/** + * Run the given block in a transaction, committing on return of the block. + * + * @param block The block to execute in the transaction. + */ +inline fun EntityManager.transaction(block: () -> Unit) { + transaction.begin() + block() + transaction.commit() +} diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt index 500c7e99..83a98cfb 100644 --- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt @@ -24,11 +24,9 @@ package nl.atlarge.opendc.integration.jpa.schema -import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.kernel.time.Instant import nl.atlarge.opendc.platform.workload.Task -import nl.atlarge.opendc.topology.container.Datacenter -import nl.atlarge.opendc.topology.machine.Machine +import nl.atlarge.opendc.platform.workload.TaskState import javax.persistence.* /** @@ -52,90 +50,68 @@ data class Task( /** * The dependencies of the task. */ - override val dependencies: Set<Task> - get() { - if (_dependencies != null) - return _dependencies!! - _dependencies = dependency?.let(::setOf) ?: emptySet() - return _dependencies!! - } - - /** - * The dependencies set cache. - */ - private var _dependencies: Set<Task>? = null - - /** - * The remaining amount of flops to compute. - */ - override var remaining: Long - get() { - if (_remaining == null) - _remaining = flops - return _remaining!! - } - private set(value) { _remaining = value } - private var _remaining: Long? = -1 + override lateinit var dependencies: Set<Task> + private set /** - * A flag to indicate the task has been accepted by the datacenter. + * The remaining flops for this task. */ - override var accepted: Boolean = false + override var remaining: Long = 0 private set /** - * A flag to indicate the task has been started. + * A flag to indicate whether the task has finished. */ - override var started: Boolean = false + override var finished: Boolean = false private set /** - * A flag to indicate whether the task is finished. + * The state of the task. */ - override var finished: Boolean = false + override lateinit var state: TaskState private set /** - * Accept the task into the scheduling queue. + * This method initialises the task object after it has been created by the JPA implementation. We use this + * initialisation method because JPA implementations only call the default constructor */ - override fun Context<Datacenter>.accept() { - accepted = true + @PostLoad + internal fun init() { + remaining = flops + dependencies = dependency?.let(::setOf) ?: emptySet() + state = TaskState.Underway } /** - * Start a task. + * This method is invoked when a task has arrived at a datacenter. + * + * @param time The moment in time the task has arrived at the datacenter. */ - override fun Context<Machine>.start() { - started = true + override fun arrive(time: Instant) { + if (state !is TaskState.Underway) { + throw IllegalStateException("The task has already been submitted to a datacenter") + } + remaining = flops + state = TaskState.Queued(time) } /** * Consume the given amount of flops of this task. * + * @param time The current moment in time of the consumption. * @param flops The total amount of flops to consume. */ - override fun Context<Machine>.consume(flops: Long) { - if (finished) + override fun consume(time: Instant, flops: Long) { + if (state is TaskState.Queued) { + state = TaskState.Running(state as TaskState.Queued, time) + } else if (finished) { return - if (remaining <= flops) { + } + remaining -= flops + if (remaining <= 0) { remaining = 0 - } else { - remaining -= flops + finished = true + state = TaskState.Finished(state as TaskState.Running, time) } } - - /** - * Finalise the task. - */ - override fun Context<Machine>.finalize() { - finished = true - } - - /** - * Reset the task. - */ - internal fun reset() { - remaining = flops - finished = false - } } diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt index f5a86e64..59b28d0b 100644 --- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt @@ -27,15 +27,19 @@ package nl.atlarge.opendc.platform import mu.KotlinLogging import nl.atlarge.opendc.integration.jpa.schema.ExperimentState import nl.atlarge.opendc.integration.jpa.schema.MachineState -import nl.atlarge.opendc.integration.jpa.schema.TaskState +import nl.atlarge.opendc.integration.jpa.transaction +import nl.atlarge.opendc.integration.jpa.schema.Trace as InternalTrace +import nl.atlarge.opendc.integration.jpa.schema.TaskState as InternalTaskState import nl.atlarge.opendc.integration.jpa.schema.Experiment as InternalExperiment import nl.atlarge.opendc.integration.jpa.schema.Task as InternalTask import nl.atlarge.opendc.kernel.Kernel +import nl.atlarge.opendc.platform.workload.TaskState import nl.atlarge.opendc.topology.JpaTopologyFactory import nl.atlarge.opendc.topology.container.Rack import nl.atlarge.opendc.topology.container.Room import nl.atlarge.opendc.topology.destinations import nl.atlarge.opendc.topology.machine.Machine +import java.util.* import javax.persistence.EntityManager /** @@ -45,8 +49,8 @@ import javax.persistence.EntityManager * @property experiment The internal experiment definition to use. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class JpaExperiment(val manager: EntityManager, - private val experiment: InternalExperiment): Experiment<Unit> { +class JpaExperiment(private val manager: EntityManager, + private val experiment: InternalExperiment): Experiment<Unit>, AutoCloseable { /** * The logging instance. */ @@ -64,11 +68,10 @@ class JpaExperiment(val manager: EntityManager, } // Set the simulation state - manager.run { - transaction.begin() + manager.transaction { experiment.state = ExperimentState.SIMULATING - transaction.commit() } + val section = experiment.path.sections.first() // Important: initialise the scheduler of the datacenter @@ -77,18 +80,16 @@ class JpaExperiment(val manager: EntityManager, val topology = JpaTopologyFactory(section).create() val simulation = kernel.create(topology) val trace = experiment.trace + val tasks = trace.jobs.flatMap { it.tasks } - logger.info { "Sending trace to kernel" } + logger.info { "Sending trace to kernel ${Objects.hashCode(trace)} ${(trace as InternalTrace).id}" } // Schedule all messages in the trace - trace.jobs.forEach { job -> - job.tasks.forEach { task -> - if (task is InternalTask) { - task.reset() - simulation.schedule(task, section.datacenter, delay = task.startTime) - } else { - logger.warn { "Dropped invalid task $task" } - } + tasks.forEach { task -> + if (task is InternalTask) { + simulation.schedule(task, section.datacenter, delay = task.startTime) + } else { + logger.warn { "Dropped invalid task $task" } } } @@ -103,48 +104,73 @@ class JpaExperiment(val manager: EntityManager, while (trace.jobs.any { !it.finished }) { // Collect data of simulation cycle - manager.transaction.begin() - machines.forEach { machine -> - val state = simulation.run { machine.state } - val wrapped = MachineState(0, - machine as nl.atlarge.opendc.integration.jpa.schema.Machine, - state.task as nl.atlarge.opendc.integration.jpa.schema.Task?, - experiment, - simulation.clock.now, - state.temperature, - state.memory, - state.load - ) - manager.persist(wrapped) - } - - trace.jobs.asSequence() - .flatMap { it.tasks.asSequence() } - .forEach { task -> - val state = TaskState(0, - task as nl.atlarge.opendc.integration.jpa.schema.Task, + manager.transaction { + experiment.last = simulation.clock.now + + machines.forEach { machine -> + val state = simulation.run { machine.state } + val wrapped = MachineState(0, + machine as nl.atlarge.opendc.integration.jpa.schema.Machine, + state.task as nl.atlarge.opendc.integration.jpa.schema.Task?, experiment, simulation.clock.now, - task.remaining.toInt(), - 1 + state.temperature, + state.memory, + state.load ) - manager.persist(state) + manager.persist(wrapped) } - manager.transaction.commit() + + trace.jobs.asSequence() + .flatMap { it.tasks.asSequence() } + .forEach { task -> + val state = InternalTaskState(0, + task as nl.atlarge.opendc.integration.jpa.schema.Task, + experiment, + simulation.clock.now, + task.remaining.toInt(), + 1 + ) + manager.persist(state) + } + } // Run next simulation cycle simulation.run(simulation.clock.now + 1) - experiment.last = simulation.clock.now } // Set the experiment state - manager.run { - transaction.begin() + manager.transaction { experiment.state = ExperimentState.FINISHED - transaction.commit() } logger.info { "Simulation done" } - manager.close() + val waiting: Long = tasks.fold(0.toLong()) { acc, task -> + val finished = task.state as TaskState.Finished + acc + (finished.previous.at - finished.previous.previous.at) + } / tasks.size + + val execution: Long = tasks.fold(0.toLong()) { acc, task -> + val finished = task.state as TaskState.Finished + acc + (finished.at - finished.previous.at) + } / tasks.size + + val turnaround: Long = tasks.fold(0.toLong()) { acc, task -> + val finished = task.state as TaskState.Finished + acc + (finished.at - finished.previous.previous.at) + } / tasks.size + + logger.info { "Average waiting time: $waiting seconds" } + logger.info { "Average execution time: $execution seconds" } + logger.info { "Average turnaround time: $turnaround seconds" } } + + /** + * Closes this resource, relinquishing any underlying resources. + * This method is invoked automatically on objects managed by the + * `try`-with-resources statement. + * + * @throws Exception if this resource cannot be closed + */ + override fun close() = manager.close() } diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt index ce69f84b..1d1e118d 100644 --- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt @@ -24,6 +24,7 @@ package nl.atlarge.opendc.platform +import nl.atlarge.opendc.integration.jpa.transaction import nl.atlarge.opendc.integration.jpa.schema.Experiment as InternalExperiment import nl.atlarge.opendc.integration.jpa.schema.ExperimentState import javax.persistence.EntityManager @@ -36,11 +37,11 @@ import javax.persistence.EntityManagerFactory * from. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class JpaExperimentManager(private val factory: EntityManagerFactory) { +class JpaExperimentManager(private val factory: EntityManagerFactory): AutoCloseable { /** * The entity manager for this experiment. */ - private val manager: EntityManager = factory.createEntityManager() + private var manager: EntityManager = factory.createEntityManager() /** * The amount of experiments in the queue. This property makes a call to the database and does therefore not @@ -60,21 +61,33 @@ class JpaExperimentManager(private val factory: EntityManagerFactory) { * @return The experiment that has been polled from the database or `null` if there are no experiments in the * queue. */ - fun poll(): Experiment<Unit>? { - manager.transaction.begin() - var experiment: InternalExperiment? = null - val results = manager.createQuery("SELECT e FROM experiments e WHERE e.state = :s", + fun poll(): JpaExperiment? { + var result: JpaExperiment? = null + manager.transaction { + var experiment: InternalExperiment? = null + val results = manager.createQuery("SELECT e FROM experiments e WHERE e.state = :s", InternalExperiment::class.java) - .setParameter("s", ExperimentState.QUEUED) - .setMaxResults(1) - .resultList + .setParameter("s", ExperimentState.QUEUED) + .setMaxResults(1) + .resultList - if (!results.isEmpty()) { - experiment = results.first() - experiment!!.state = ExperimentState.CLAIMED + if (!results.isEmpty()) { + experiment = results.first() + experiment!!.state = ExperimentState.CLAIMED + } + result = experiment?.let { JpaExperiment(manager, it) } } - manager.transaction.commit() - return experiment?.let { JpaExperiment(factory.createEntityManager(), it) } + manager = factory.createEntityManager() + return result } + + /** + * Close this resource, relinquishing any underlying resources. + * This method is invoked automatically on objects managed by the + * `try`-with-resources statement.* + * + * @throws Exception if this resource cannot be closed + */ + override fun close() = manager.close() } diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt index 6a1d839d..794ca44e 100644 --- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt @@ -45,17 +45,17 @@ fun main(args: Array<String>) { properties["javax.persistence.jdbc.password"] = env["PERSISTENCE_PASSWORD"] ?: "" val factory = Persistence.createEntityManagerFactory("opendc-simulator", properties) - val threads = 1 + val threads = 4 val executorService = Executors.newFixedThreadPool(threads) val experiments = JpaExperimentManager(factory) + val kernel = OmegaKernel logger.info { "Waiting for enqueued experiments..." } while (true) { - val experiment = experiments.poll() - executorService.submit { - experiment?.run { - logger.info { "Found experiment. Running simulating now..." } - run(OmegaKernel) + experiments.poll()?.let { experiment -> + logger.info { "Found experiment. Submitting for simulation now..." } + executorService.submit { + experiment.use { it.run(kernel) } } } diff --git a/opendc-integration-jpa/src/main/resources/jpa/schema.xml b/opendc-integration-jpa/src/main/resources/jpa/schema.xml index a14fb96d..bd6ea7a1 100644 --- a/opendc-integration-jpa/src/main/resources/jpa/schema.xml +++ b/opendc-integration-jpa/src/main/resources/jpa/schema.xml @@ -85,7 +85,7 @@ </attributes> </entity> - <entity class="Trace" access="FIELD" name="traces"> + <entity class="Trace" access="FIELD" name="traces" cacheable="false"> <attributes> <id name="id" /> <basic name="name"> @@ -97,17 +97,17 @@ </attributes> </entity> - <entity class="Job" access="FIELD" name="jobs"> + <entity class="Job" access="FIELD" name="jobs" cacheable="false"> <attributes> <id name="id" /> - <one-to-many name="tasks" target-entity="nl.atlarge.opendc.integration.jpa.schema.Task"> + <one-to-many name="tasks" target-entity="Task"> <join-column name="job_id" /> </one-to-many> <transient name="owner" /> </attributes> </entity> - <entity class="Task" access="FIELD" name="tasks"> + <entity class="Task" access="FIELD" name="tasks" cacheable="false"> <convert converter="nl.atlarge.opendc.integration.jpa.converter.ParallelizableConverter" attribute-name="parallelizable" /> <attributes> <id name="id" /> @@ -121,13 +121,12 @@ <column name="parallelizability" column-definition="text" /> </basic> - <one-to-one name="dependency" target-entity="nl.atlarge.opendc.integration.jpa.schema.Task"> + <one-to-one name="dependency" target-entity="Task"> <join-column name="task_dependency_id" /> </one-to-one> - <transient name="_dependencies" /> - <transient name="_remaining" /> - <transient name="accepted" /> - <transient name="started" /> + <transient name="dependencies" /> + <transient name="state" /> + <transient name="remaining" /> <transient name="finished" /> </attributes> </entity> diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/FifoScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/FifoScheduler.kt index 1c3dc869..c45ed5e6 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/FifoScheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/FifoScheduler.kt @@ -25,6 +25,7 @@ package nl.atlarge.opendc.platform.scheduler import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.platform.workload.Job import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine import nl.atlarge.opendc.platform.workload.Task @@ -69,7 +70,7 @@ class FifoScheduler : Scheduler { val task = iterator.next() // TODO What to do with tasks that are not ready yet to be processed - if (!task.isReady()) { + if (!task.ready) { iterator.remove() rescheduled.add(task) continue @@ -85,7 +86,7 @@ class FifoScheduler : Scheduler { // Reschedule all tasks that are not ready yet while (!rescheduled.isEmpty()) { - submit(rescheduled.poll()) + queue.add(rescheduled.poll()) } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/Scheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/Scheduler.kt index bf988802..578bef9c 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/Scheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/Scheduler.kt @@ -25,9 +25,9 @@ package nl.atlarge.opendc.platform.scheduler import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.platform.workload.Task import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine -import nl.atlarge.opendc.platform.workload.Task /** * A task scheduler that is coupled to an [Entity] in the topology of the cloud network. diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/SrtfScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/SrtfScheduler.kt index b2660964..03f37b50 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/SrtfScheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/SrtfScheduler.kt @@ -25,9 +25,9 @@ package nl.atlarge.opendc.platform.scheduler import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.platform.workload.Task import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine -import nl.atlarge.opendc.platform.workload.Task import java.util.* /** @@ -67,8 +67,8 @@ class SrtfScheduler : Scheduler { val task = iterator.next() // TODO What to do with tasks that are not ready yet to be processed - if (!task.isReady()) { - submit(task) + if (!task.ready) { + tasks.add(task) continue } else if (task.finished) { tasks.remove(task) diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Job.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Job.kt index eb173229..df2f2b6a 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Job.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Job.kt @@ -25,7 +25,7 @@ package nl.atlarge.opendc.platform.workload /** - * A job that is submitted to a cloud network, which consists of one or multiple [Task]s. + * A bag of tasks which are submitted by a [User] to the cloud network. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Task.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Task.kt index 140a5d5d..e740cdd8 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Task.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Task.kt @@ -24,12 +24,11 @@ package nl.atlarge.opendc.platform.workload -import nl.atlarge.opendc.kernel.Context -import nl.atlarge.opendc.topology.container.Datacenter +import nl.atlarge.opendc.kernel.time.Instant import nl.atlarge.opendc.topology.machine.Machine /** - * A task represents some computation that is part of a [Job]. + * A task that runs as part of a [Job] on a [Machine]. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ @@ -55,51 +54,39 @@ interface Task { val parallelizable: Boolean /** - * The remaining amount of flops to compute. + * The remaining flops for this task. */ val remaining: Long /** - * A flag to indicate the task has been accepted by the datacenter. + * The state of the task. */ - val accepted: Boolean + val state: TaskState /** - * A flag to indicate the task has been started. + * A flag to indicate whether the task is ready to be started. */ - val started: Boolean + val ready: Boolean + get() = !dependencies.any { !it.finished } /** - * A flag to indicate whether the task is finished. + * A flag to indicate whether the task has finished. */ val finished: Boolean + get() = state is TaskState.Finished /** - * Determine whether the task is ready to be processed. + * This method is invoked when a task has arrived at a datacenter. * - * @return `true` if the task is ready to be processed, `false` otherwise. + * @param time The moment in time the task has arrived at the datacenter. */ - fun isReady() = dependencies.all { it.finished } - - /** - * Accept the task into the scheduling queue. - */ - fun Context<Datacenter>.accept() - - /** - * Start a task. - */ - fun Context<Machine>.start() + fun arrive(time: Instant) /** * Consume the given amount of flops of this task. * + * @param time The current moment in time of the consumption. * @param flops The total amount of flops to consume. */ - fun Context<Machine>.consume(flops: Long) - - /** - * Finalise the task. - */ - fun Context<Machine>.finalize() + fun consume(time: Instant, flops: Long) } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/TaskState.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/TaskState.kt new file mode 100644 index 00000000..d1f908af --- /dev/null +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/TaskState.kt @@ -0,0 +1,72 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.platform.workload + +import nl.atlarge.opendc.kernel.time.Instant + + +/** + * This class hierarchy describes the states of a [Task]. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +sealed class TaskState { + /** + * A state to indicate the task has not yet arrived at the [Datacenter]. + */ + object Underway : TaskState() + + /** + * A state to indicate the task has arrived at the [Datacenter]. + * + * @property at The moment in time the task has arrived. + */ + data class Queued(val at: Instant) : TaskState() + + /** + * A state to indicate the task has started running on a machine. + * + * @property previous The previous state of the task. + * @property at The moment in time the task started. + */ + data class Running(val previous: Queued, val at: Instant) : TaskState() + + /** + * A state to indicate the task has finished. + * + * @property previous The previous state of the task. + * @property at The moment in time the task finished. + */ + data class Finished(val previous: Running, val at: Instant) : TaskState() + + /** + * A state to indicate the task has failed. + * + * @property previous The previous state of the task. + * @property at The moment in time the task failed. + * @property reason The reason of the failure. + */ + data class Failed(val previous: Running, val at: Instant, val reason: String) : TaskState() +} diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt index 333609bb..b3d67568 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt @@ -30,9 +30,10 @@ import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.kernel.Process import nl.atlarge.opendc.kernel.time.Duration import nl.atlarge.opendc.platform.scheduler.Scheduler +import nl.atlarge.opendc.platform.workload.Job +import nl.atlarge.opendc.platform.workload.Task import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine -import nl.atlarge.opendc.platform.workload.Task import java.util.* /** @@ -88,6 +89,7 @@ interface Datacenter : Entity<Unit>, Process<Datacenter> { while (queue.isNotEmpty()) { val msg = queue.poll() if (msg is Task) { + msg.arrive(time) scheduler.submit(msg) } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt index b1d881a1..761f14b1 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt @@ -83,16 +83,14 @@ open class Machine : Entity<Machine.State>, Process<Machine> { var task: Task = receiveTask() update(State(Status.RUNNING, task, load = 1.0, memory = state.memory + 50, temperature = 30.0)) - task.run { start() } while (true) { - if (task.remaining <= 0) { - task.run { finalize() } + if (task.finished) { logger.info { "${entity.id}: Task ${task.id} finished. Machine idle at $time" } update(State(Status.IDLE)) task = receiveTask() } else { - task.run { consume(speed * delta) } + task.consume(time, speed * delta) } // Check if we have received a new order in the meantime. @@ -100,7 +98,6 @@ open class Machine : Entity<Machine.State>, Process<Machine> { if (msg is Task) { task = msg update(State(Status.RUNNING, task, load = 1.0, memory = state.memory + 50, temperature = 30.0)) - task.run { start() } } } } |
