diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2017-10-04 10:44:24 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2017-10-04 10:47:08 +0200 |
| commit | 25cc35b0e4942e990c01ac6224720e8fe84fd9ae (patch) | |
| tree | 12cd345922388df70ac4498570659d6c6990a7e5 /opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema | |
| parent | 2c26e9c91c945af065770c323e8b80a9f5104379 (diff) | |
bug(#9): Fix interference between experiments
This change fixes the interference of multiple experiments running at
the same time due to some thread unsafe behaviour in the
JpaExperimentManager class.
The code has now been restructured to solve the issue and fix the thread
unsafe behaviour.
Closes #9.
Diffstat (limited to 'opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema')
| -rw-r--r-- | opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt | 94 |
1 files changed, 35 insertions, 59 deletions
diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt index 500c7e99..83a98cfb 100644 --- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt @@ -24,11 +24,9 @@ package nl.atlarge.opendc.integration.jpa.schema -import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.kernel.time.Instant import nl.atlarge.opendc.platform.workload.Task -import nl.atlarge.opendc.topology.container.Datacenter -import nl.atlarge.opendc.topology.machine.Machine +import nl.atlarge.opendc.platform.workload.TaskState import javax.persistence.* /** @@ -52,90 +50,68 @@ data class Task( /** * The dependencies of the task. */ - override val dependencies: Set<Task> - get() { - if (_dependencies != null) - return _dependencies!! - _dependencies = dependency?.let(::setOf) ?: emptySet() - return _dependencies!! - } - - /** - * The dependencies set cache. - */ - private var _dependencies: Set<Task>? = null - - /** - * The remaining amount of flops to compute. - */ - override var remaining: Long - get() { - if (_remaining == null) - _remaining = flops - return _remaining!! - } - private set(value) { _remaining = value } - private var _remaining: Long? = -1 + override lateinit var dependencies: Set<Task> + private set /** - * A flag to indicate the task has been accepted by the datacenter. + * The remaining flops for this task. */ - override var accepted: Boolean = false + override var remaining: Long = 0 private set /** - * A flag to indicate the task has been started. + * A flag to indicate whether the task has finished. */ - override var started: Boolean = false + override var finished: Boolean = false private set /** - * A flag to indicate whether the task is finished. + * The state of the task. */ - override var finished: Boolean = false + override lateinit var state: TaskState private set /** - * Accept the task into the scheduling queue. + * This method initialises the task object after it has been created by the JPA implementation. We use this + * initialisation method because JPA implementations only call the default constructor */ - override fun Context<Datacenter>.accept() { - accepted = true + @PostLoad + internal fun init() { + remaining = flops + dependencies = dependency?.let(::setOf) ?: emptySet() + state = TaskState.Underway } /** - * Start a task. + * This method is invoked when a task has arrived at a datacenter. + * + * @param time The moment in time the task has arrived at the datacenter. */ - override fun Context<Machine>.start() { - started = true + override fun arrive(time: Instant) { + if (state !is TaskState.Underway) { + throw IllegalStateException("The task has already been submitted to a datacenter") + } + remaining = flops + state = TaskState.Queued(time) } /** * Consume the given amount of flops of this task. * + * @param time The current moment in time of the consumption. * @param flops The total amount of flops to consume. */ - override fun Context<Machine>.consume(flops: Long) { - if (finished) + override fun consume(time: Instant, flops: Long) { + if (state is TaskState.Queued) { + state = TaskState.Running(state as TaskState.Queued, time) + } else if (finished) { return - if (remaining <= flops) { + } + remaining -= flops + if (remaining <= 0) { remaining = 0 - } else { - remaining -= flops + finished = true + state = TaskState.Finished(state as TaskState.Running, time) } } - - /** - * Finalise the task. - */ - override fun Context<Machine>.finalize() { - finished = true - } - - /** - * Reset the task. - */ - internal fun reset() { - remaining = flops - finished = false - } } |
