summaryrefslogtreecommitdiff
path: root/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2017-10-04 10:44:24 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2017-10-04 10:47:08 +0200
commit25cc35b0e4942e990c01ac6224720e8fe84fd9ae (patch)
tree12cd345922388df70ac4498570659d6c6990a7e5 /opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt
parent2c26e9c91c945af065770c323e8b80a9f5104379 (diff)
bug(#9): Fix interference between experiments
This change fixes the interference of multiple experiments running at the same time due to some thread unsafe behaviour in the JpaExperimentManager class. The code has now been restructured to solve the issue and fix the thread unsafe behaviour. Closes #9.
Diffstat (limited to 'opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt')
-rw-r--r--opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt114
1 files changed, 70 insertions, 44 deletions
diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt
index f5a86e64..59b28d0b 100644
--- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt
+++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt
@@ -27,15 +27,19 @@ package nl.atlarge.opendc.platform
import mu.KotlinLogging
import nl.atlarge.opendc.integration.jpa.schema.ExperimentState
import nl.atlarge.opendc.integration.jpa.schema.MachineState
-import nl.atlarge.opendc.integration.jpa.schema.TaskState
+import nl.atlarge.opendc.integration.jpa.transaction
+import nl.atlarge.opendc.integration.jpa.schema.Trace as InternalTrace
+import nl.atlarge.opendc.integration.jpa.schema.TaskState as InternalTaskState
import nl.atlarge.opendc.integration.jpa.schema.Experiment as InternalExperiment
import nl.atlarge.opendc.integration.jpa.schema.Task as InternalTask
import nl.atlarge.opendc.kernel.Kernel
+import nl.atlarge.opendc.platform.workload.TaskState
import nl.atlarge.opendc.topology.JpaTopologyFactory
import nl.atlarge.opendc.topology.container.Rack
import nl.atlarge.opendc.topology.container.Room
import nl.atlarge.opendc.topology.destinations
import nl.atlarge.opendc.topology.machine.Machine
+import java.util.*
import javax.persistence.EntityManager
/**
@@ -45,8 +49,8 @@ import javax.persistence.EntityManager
* @property experiment The internal experiment definition to use.
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-class JpaExperiment(val manager: EntityManager,
- private val experiment: InternalExperiment): Experiment<Unit> {
+class JpaExperiment(private val manager: EntityManager,
+ private val experiment: InternalExperiment): Experiment<Unit>, AutoCloseable {
/**
* The logging instance.
*/
@@ -64,11 +68,10 @@ class JpaExperiment(val manager: EntityManager,
}
// Set the simulation state
- manager.run {
- transaction.begin()
+ manager.transaction {
experiment.state = ExperimentState.SIMULATING
- transaction.commit()
}
+
val section = experiment.path.sections.first()
// Important: initialise the scheduler of the datacenter
@@ -77,18 +80,16 @@ class JpaExperiment(val manager: EntityManager,
val topology = JpaTopologyFactory(section).create()
val simulation = kernel.create(topology)
val trace = experiment.trace
+ val tasks = trace.jobs.flatMap { it.tasks }
- logger.info { "Sending trace to kernel" }
+ logger.info { "Sending trace to kernel ${Objects.hashCode(trace)} ${(trace as InternalTrace).id}" }
// Schedule all messages in the trace
- trace.jobs.forEach { job ->
- job.tasks.forEach { task ->
- if (task is InternalTask) {
- task.reset()
- simulation.schedule(task, section.datacenter, delay = task.startTime)
- } else {
- logger.warn { "Dropped invalid task $task" }
- }
+ tasks.forEach { task ->
+ if (task is InternalTask) {
+ simulation.schedule(task, section.datacenter, delay = task.startTime)
+ } else {
+ logger.warn { "Dropped invalid task $task" }
}
}
@@ -103,48 +104,73 @@ class JpaExperiment(val manager: EntityManager,
while (trace.jobs.any { !it.finished }) {
// Collect data of simulation cycle
- manager.transaction.begin()
- machines.forEach { machine ->
- val state = simulation.run { machine.state }
- val wrapped = MachineState(0,
- machine as nl.atlarge.opendc.integration.jpa.schema.Machine,
- state.task as nl.atlarge.opendc.integration.jpa.schema.Task?,
- experiment,
- simulation.clock.now,
- state.temperature,
- state.memory,
- state.load
- )
- manager.persist(wrapped)
- }
-
- trace.jobs.asSequence()
- .flatMap { it.tasks.asSequence() }
- .forEach { task ->
- val state = TaskState(0,
- task as nl.atlarge.opendc.integration.jpa.schema.Task,
+ manager.transaction {
+ experiment.last = simulation.clock.now
+
+ machines.forEach { machine ->
+ val state = simulation.run { machine.state }
+ val wrapped = MachineState(0,
+ machine as nl.atlarge.opendc.integration.jpa.schema.Machine,
+ state.task as nl.atlarge.opendc.integration.jpa.schema.Task?,
experiment,
simulation.clock.now,
- task.remaining.toInt(),
- 1
+ state.temperature,
+ state.memory,
+ state.load
)
- manager.persist(state)
+ manager.persist(wrapped)
}
- manager.transaction.commit()
+
+ trace.jobs.asSequence()
+ .flatMap { it.tasks.asSequence() }
+ .forEach { task ->
+ val state = InternalTaskState(0,
+ task as nl.atlarge.opendc.integration.jpa.schema.Task,
+ experiment,
+ simulation.clock.now,
+ task.remaining.toInt(),
+ 1
+ )
+ manager.persist(state)
+ }
+ }
// Run next simulation cycle
simulation.run(simulation.clock.now + 1)
- experiment.last = simulation.clock.now
}
// Set the experiment state
- manager.run {
- transaction.begin()
+ manager.transaction {
experiment.state = ExperimentState.FINISHED
- transaction.commit()
}
logger.info { "Simulation done" }
- manager.close()
+ val waiting: Long = tasks.fold(0.toLong()) { acc, task ->
+ val finished = task.state as TaskState.Finished
+ acc + (finished.previous.at - finished.previous.previous.at)
+ } / tasks.size
+
+ val execution: Long = tasks.fold(0.toLong()) { acc, task ->
+ val finished = task.state as TaskState.Finished
+ acc + (finished.at - finished.previous.at)
+ } / tasks.size
+
+ val turnaround: Long = tasks.fold(0.toLong()) { acc, task ->
+ val finished = task.state as TaskState.Finished
+ acc + (finished.at - finished.previous.previous.at)
+ } / tasks.size
+
+ logger.info { "Average waiting time: $waiting seconds" }
+ logger.info { "Average execution time: $execution seconds" }
+ logger.info { "Average turnaround time: $turnaround seconds" }
}
+
+ /**
+ * Closes this resource, relinquishing any underlying resources.
+ * This method is invoked automatically on objects managed by the
+ * `try`-with-resources statement.
+ *
+ * @throws Exception if this resource cannot be closed
+ */
+ override fun close() = manager.close()
}