From 25cc35b0e4942e990c01ac6224720e8fe84fd9ae Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 4 Oct 2017 10:44:24 +0200 Subject: bug(#9): Fix interference between experiments This change fixes the interference of multiple experiments running at the same time due to some thread unsafe behaviour in the JpaExperimentManager class. The code has now been restructured to solve the issue and fix the thread unsafe behaviour. Closes #9. --- .../atlarge/opendc/integration/jpa/schema/Task.kt | 94 ++++++++-------------- 1 file changed, 35 insertions(+), 59 deletions(-) (limited to 'opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema') diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt index 500c7e99..83a98cfb 100644 --- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt @@ -24,11 +24,9 @@ package nl.atlarge.opendc.integration.jpa.schema -import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.kernel.time.Instant import nl.atlarge.opendc.platform.workload.Task -import nl.atlarge.opendc.topology.container.Datacenter -import nl.atlarge.opendc.topology.machine.Machine +import nl.atlarge.opendc.platform.workload.TaskState import javax.persistence.* /** @@ -52,90 +50,68 @@ data class Task( /** * The dependencies of the task. */ - override val dependencies: Set - get() { - if (_dependencies != null) - return _dependencies!! - _dependencies = dependency?.let(::setOf) ?: emptySet() - return _dependencies!! - } - - /** - * The dependencies set cache. - */ - private var _dependencies: Set? = null - - /** - * The remaining amount of flops to compute. - */ - override var remaining: Long - get() { - if (_remaining == null) - _remaining = flops - return _remaining!! - } - private set(value) { _remaining = value } - private var _remaining: Long? = -1 + override lateinit var dependencies: Set + private set /** - * A flag to indicate the task has been accepted by the datacenter. + * The remaining flops for this task. */ - override var accepted: Boolean = false + override var remaining: Long = 0 private set /** - * A flag to indicate the task has been started. + * A flag to indicate whether the task has finished. */ - override var started: Boolean = false + override var finished: Boolean = false private set /** - * A flag to indicate whether the task is finished. + * The state of the task. */ - override var finished: Boolean = false + override lateinit var state: TaskState private set /** - * Accept the task into the scheduling queue. + * This method initialises the task object after it has been created by the JPA implementation. We use this + * initialisation method because JPA implementations only call the default constructor */ - override fun Context.accept() { - accepted = true + @PostLoad + internal fun init() { + remaining = flops + dependencies = dependency?.let(::setOf) ?: emptySet() + state = TaskState.Underway } /** - * Start a task. + * This method is invoked when a task has arrived at a datacenter. + * + * @param time The moment in time the task has arrived at the datacenter. */ - override fun Context.start() { - started = true + override fun arrive(time: Instant) { + if (state !is TaskState.Underway) { + throw IllegalStateException("The task has already been submitted to a datacenter") + } + remaining = flops + state = TaskState.Queued(time) } /** * Consume the given amount of flops of this task. * + * @param time The current moment in time of the consumption. * @param flops The total amount of flops to consume. */ - override fun Context.consume(flops: Long) { - if (finished) + override fun consume(time: Instant, flops: Long) { + if (state is TaskState.Queued) { + state = TaskState.Running(state as TaskState.Queued, time) + } else if (finished) { return - if (remaining <= flops) { + } + remaining -= flops + if (remaining <= 0) { remaining = 0 - } else { - remaining -= flops + finished = true + state = TaskState.Finished(state as TaskState.Running, time) } } - - /** - * Finalise the task. - */ - override fun Context.finalize() { - finished = true - } - - /** - * Reset the task. - */ - internal fun reset() { - remaining = flops - finished = false - } } -- cgit v1.2.3