diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2017-10-04 10:44:24 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2017-10-04 10:47:08 +0200 |
| commit | 25cc35b0e4942e990c01ac6224720e8fe84fd9ae (patch) | |
| tree | 12cd345922388df70ac4498570659d6c6990a7e5 /opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt | |
| parent | 2c26e9c91c945af065770c323e8b80a9f5104379 (diff) | |
bug(#9): Fix interference between experiments
This change fixes the interference of multiple experiments running at
the same time due to some thread unsafe behaviour in the
JpaExperimentManager class.
The code has now been restructured to solve the issue and fix the thread
unsafe behaviour.
Closes #9.
Diffstat (limited to 'opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt')
| -rw-r--r-- | opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt | 114 |
1 files changed, 70 insertions, 44 deletions
diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt index f5a86e64..59b28d0b 100644 --- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt @@ -27,15 +27,19 @@ package nl.atlarge.opendc.platform import mu.KotlinLogging import nl.atlarge.opendc.integration.jpa.schema.ExperimentState import nl.atlarge.opendc.integration.jpa.schema.MachineState -import nl.atlarge.opendc.integration.jpa.schema.TaskState +import nl.atlarge.opendc.integration.jpa.transaction +import nl.atlarge.opendc.integration.jpa.schema.Trace as InternalTrace +import nl.atlarge.opendc.integration.jpa.schema.TaskState as InternalTaskState import nl.atlarge.opendc.integration.jpa.schema.Experiment as InternalExperiment import nl.atlarge.opendc.integration.jpa.schema.Task as InternalTask import nl.atlarge.opendc.kernel.Kernel +import nl.atlarge.opendc.platform.workload.TaskState import nl.atlarge.opendc.topology.JpaTopologyFactory import nl.atlarge.opendc.topology.container.Rack import nl.atlarge.opendc.topology.container.Room import nl.atlarge.opendc.topology.destinations import nl.atlarge.opendc.topology.machine.Machine +import java.util.* import javax.persistence.EntityManager /** @@ -45,8 +49,8 @@ import javax.persistence.EntityManager * @property experiment The internal experiment definition to use. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class JpaExperiment(val manager: EntityManager, - private val experiment: InternalExperiment): Experiment<Unit> { +class JpaExperiment(private val manager: EntityManager, + private val experiment: InternalExperiment): Experiment<Unit>, AutoCloseable { /** * The logging instance. */ @@ -64,11 +68,10 @@ class JpaExperiment(val manager: EntityManager, } // Set the simulation state - manager.run { - transaction.begin() + manager.transaction { experiment.state = ExperimentState.SIMULATING - transaction.commit() } + val section = experiment.path.sections.first() // Important: initialise the scheduler of the datacenter @@ -77,18 +80,16 @@ class JpaExperiment(val manager: EntityManager, val topology = JpaTopologyFactory(section).create() val simulation = kernel.create(topology) val trace = experiment.trace + val tasks = trace.jobs.flatMap { it.tasks } - logger.info { "Sending trace to kernel" } + logger.info { "Sending trace to kernel ${Objects.hashCode(trace)} ${(trace as InternalTrace).id}" } // Schedule all messages in the trace - trace.jobs.forEach { job -> - job.tasks.forEach { task -> - if (task is InternalTask) { - task.reset() - simulation.schedule(task, section.datacenter, delay = task.startTime) - } else { - logger.warn { "Dropped invalid task $task" } - } + tasks.forEach { task -> + if (task is InternalTask) { + simulation.schedule(task, section.datacenter, delay = task.startTime) + } else { + logger.warn { "Dropped invalid task $task" } } } @@ -103,48 +104,73 @@ class JpaExperiment(val manager: EntityManager, while (trace.jobs.any { !it.finished }) { // Collect data of simulation cycle - manager.transaction.begin() - machines.forEach { machine -> - val state = simulation.run { machine.state } - val wrapped = MachineState(0, - machine as nl.atlarge.opendc.integration.jpa.schema.Machine, - state.task as nl.atlarge.opendc.integration.jpa.schema.Task?, - experiment, - simulation.clock.now, - state.temperature, - state.memory, - state.load - ) - manager.persist(wrapped) - } - - trace.jobs.asSequence() - .flatMap { it.tasks.asSequence() } - .forEach { task -> - val state = TaskState(0, - task as nl.atlarge.opendc.integration.jpa.schema.Task, + manager.transaction { + experiment.last = simulation.clock.now + + machines.forEach { machine -> + val state = simulation.run { machine.state } + val wrapped = MachineState(0, + machine as nl.atlarge.opendc.integration.jpa.schema.Machine, + state.task as nl.atlarge.opendc.integration.jpa.schema.Task?, experiment, simulation.clock.now, - task.remaining.toInt(), - 1 + state.temperature, + state.memory, + state.load ) - manager.persist(state) + manager.persist(wrapped) } - manager.transaction.commit() + + trace.jobs.asSequence() + .flatMap { it.tasks.asSequence() } + .forEach { task -> + val state = InternalTaskState(0, + task as nl.atlarge.opendc.integration.jpa.schema.Task, + experiment, + simulation.clock.now, + task.remaining.toInt(), + 1 + ) + manager.persist(state) + } + } // Run next simulation cycle simulation.run(simulation.clock.now + 1) - experiment.last = simulation.clock.now } // Set the experiment state - manager.run { - transaction.begin() + manager.transaction { experiment.state = ExperimentState.FINISHED - transaction.commit() } logger.info { "Simulation done" } - manager.close() + val waiting: Long = tasks.fold(0.toLong()) { acc, task -> + val finished = task.state as TaskState.Finished + acc + (finished.previous.at - finished.previous.previous.at) + } / tasks.size + + val execution: Long = tasks.fold(0.toLong()) { acc, task -> + val finished = task.state as TaskState.Finished + acc + (finished.at - finished.previous.at) + } / tasks.size + + val turnaround: Long = tasks.fold(0.toLong()) { acc, task -> + val finished = task.state as TaskState.Finished + acc + (finished.at - finished.previous.previous.at) + } / tasks.size + + logger.info { "Average waiting time: $waiting seconds" } + logger.info { "Average execution time: $execution seconds" } + logger.info { "Average turnaround time: $turnaround seconds" } } + + /** + * Closes this resource, relinquishing any underlying resources. + * This method is invoked automatically on objects managed by the + * `try`-with-resources statement. + * + * @throws Exception if this resource cannot be closed + */ + override fun close() = manager.close() } |
