summaryrefslogtreecommitdiff
path: root/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2017-10-04 10:44:24 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2017-10-04 10:47:08 +0200
commit25cc35b0e4942e990c01ac6224720e8fe84fd9ae (patch)
tree12cd345922388df70ac4498570659d6c6990a7e5 /opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema
parent2c26e9c91c945af065770c323e8b80a9f5104379 (diff)
bug(#9): Fix interference between experiments
This change fixes the interference of multiple experiments running at the same time due to some thread unsafe behaviour in the JpaExperimentManager class. The code has now been restructured to solve the issue and fix the thread unsafe behaviour. Closes #9.
Diffstat (limited to 'opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema')
-rw-r--r--opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt94
1 files changed, 35 insertions, 59 deletions
diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt
index 500c7e99..83a98cfb 100644
--- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt
+++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt
@@ -24,11 +24,9 @@
package nl.atlarge.opendc.integration.jpa.schema
-import nl.atlarge.opendc.kernel.Context
import nl.atlarge.opendc.kernel.time.Instant
import nl.atlarge.opendc.platform.workload.Task
-import nl.atlarge.opendc.topology.container.Datacenter
-import nl.atlarge.opendc.topology.machine.Machine
+import nl.atlarge.opendc.platform.workload.TaskState
import javax.persistence.*
/**
@@ -52,90 +50,68 @@ data class Task(
/**
* The dependencies of the task.
*/
- override val dependencies: Set<Task>
- get() {
- if (_dependencies != null)
- return _dependencies!!
- _dependencies = dependency?.let(::setOf) ?: emptySet()
- return _dependencies!!
- }
-
- /**
- * The dependencies set cache.
- */
- private var _dependencies: Set<Task>? = null
-
- /**
- * The remaining amount of flops to compute.
- */
- override var remaining: Long
- get() {
- if (_remaining == null)
- _remaining = flops
- return _remaining!!
- }
- private set(value) { _remaining = value }
- private var _remaining: Long? = -1
+ override lateinit var dependencies: Set<Task>
+ private set
/**
- * A flag to indicate the task has been accepted by the datacenter.
+ * The remaining flops for this task.
*/
- override var accepted: Boolean = false
+ override var remaining: Long = 0
private set
/**
- * A flag to indicate the task has been started.
+ * A flag to indicate whether the task has finished.
*/
- override var started: Boolean = false
+ override var finished: Boolean = false
private set
/**
- * A flag to indicate whether the task is finished.
+ * The state of the task.
*/
- override var finished: Boolean = false
+ override lateinit var state: TaskState
private set
/**
- * Accept the task into the scheduling queue.
+ * This method initialises the task object after it has been created by the JPA implementation. We use this
+ * initialisation method because JPA implementations only call the default constructor
*/
- override fun Context<Datacenter>.accept() {
- accepted = true
+ @PostLoad
+ internal fun init() {
+ remaining = flops
+ dependencies = dependency?.let(::setOf) ?: emptySet()
+ state = TaskState.Underway
}
/**
- * Start a task.
+ * This method is invoked when a task has arrived at a datacenter.
+ *
+ * @param time The moment in time the task has arrived at the datacenter.
*/
- override fun Context<Machine>.start() {
- started = true
+ override fun arrive(time: Instant) {
+ if (state !is TaskState.Underway) {
+ throw IllegalStateException("The task has already been submitted to a datacenter")
+ }
+ remaining = flops
+ state = TaskState.Queued(time)
}
/**
* Consume the given amount of flops of this task.
*
+ * @param time The current moment in time of the consumption.
* @param flops The total amount of flops to consume.
*/
- override fun Context<Machine>.consume(flops: Long) {
- if (finished)
+ override fun consume(time: Instant, flops: Long) {
+ if (state is TaskState.Queued) {
+ state = TaskState.Running(state as TaskState.Queued, time)
+ } else if (finished) {
return
- if (remaining <= flops) {
+ }
+ remaining -= flops
+ if (remaining <= 0) {
remaining = 0
- } else {
- remaining -= flops
+ finished = true
+ state = TaskState.Finished(state as TaskState.Running, time)
}
}
-
- /**
- * Finalise the task.
- */
- override fun Context<Machine>.finalize() {
- finished = true
- }
-
- /**
- * Reset the task.
- */
- internal fun reset() {
- remaining = flops
- finished = false
- }
}