diff options
Diffstat (limited to 'opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform')
3 files changed, 103 insertions, 64 deletions
diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt index f5a86e64..59b28d0b 100644 --- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt @@ -27,15 +27,19 @@ package nl.atlarge.opendc.platform import mu.KotlinLogging import nl.atlarge.opendc.integration.jpa.schema.ExperimentState import nl.atlarge.opendc.integration.jpa.schema.MachineState -import nl.atlarge.opendc.integration.jpa.schema.TaskState +import nl.atlarge.opendc.integration.jpa.transaction +import nl.atlarge.opendc.integration.jpa.schema.Trace as InternalTrace +import nl.atlarge.opendc.integration.jpa.schema.TaskState as InternalTaskState import nl.atlarge.opendc.integration.jpa.schema.Experiment as InternalExperiment import nl.atlarge.opendc.integration.jpa.schema.Task as InternalTask import nl.atlarge.opendc.kernel.Kernel +import nl.atlarge.opendc.platform.workload.TaskState import nl.atlarge.opendc.topology.JpaTopologyFactory import nl.atlarge.opendc.topology.container.Rack import nl.atlarge.opendc.topology.container.Room import nl.atlarge.opendc.topology.destinations import nl.atlarge.opendc.topology.machine.Machine +import java.util.* import javax.persistence.EntityManager /** @@ -45,8 +49,8 @@ import javax.persistence.EntityManager * @property experiment The internal experiment definition to use. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class JpaExperiment(val manager: EntityManager, - private val experiment: InternalExperiment): Experiment<Unit> { +class JpaExperiment(private val manager: EntityManager, + private val experiment: InternalExperiment): Experiment<Unit>, AutoCloseable { /** * The logging instance. */ @@ -64,11 +68,10 @@ class JpaExperiment(val manager: EntityManager, } // Set the simulation state - manager.run { - transaction.begin() + manager.transaction { experiment.state = ExperimentState.SIMULATING - transaction.commit() } + val section = experiment.path.sections.first() // Important: initialise the scheduler of the datacenter @@ -77,18 +80,16 @@ class JpaExperiment(val manager: EntityManager, val topology = JpaTopologyFactory(section).create() val simulation = kernel.create(topology) val trace = experiment.trace + val tasks = trace.jobs.flatMap { it.tasks } - logger.info { "Sending trace to kernel" } + logger.info { "Sending trace to kernel ${Objects.hashCode(trace)} ${(trace as InternalTrace).id}" } // Schedule all messages in the trace - trace.jobs.forEach { job -> - job.tasks.forEach { task -> - if (task is InternalTask) { - task.reset() - simulation.schedule(task, section.datacenter, delay = task.startTime) - } else { - logger.warn { "Dropped invalid task $task" } - } + tasks.forEach { task -> + if (task is InternalTask) { + simulation.schedule(task, section.datacenter, delay = task.startTime) + } else { + logger.warn { "Dropped invalid task $task" } } } @@ -103,48 +104,73 @@ class JpaExperiment(val manager: EntityManager, while (trace.jobs.any { !it.finished }) { // Collect data of simulation cycle - manager.transaction.begin() - machines.forEach { machine -> - val state = simulation.run { machine.state } - val wrapped = MachineState(0, - machine as nl.atlarge.opendc.integration.jpa.schema.Machine, - state.task as nl.atlarge.opendc.integration.jpa.schema.Task?, - experiment, - simulation.clock.now, - state.temperature, - state.memory, - state.load - ) - manager.persist(wrapped) - } - - trace.jobs.asSequence() - .flatMap { it.tasks.asSequence() } - .forEach { task -> - val state = TaskState(0, - task as nl.atlarge.opendc.integration.jpa.schema.Task, + manager.transaction { + experiment.last = simulation.clock.now + + machines.forEach { machine -> + val state = simulation.run { machine.state } + val wrapped = MachineState(0, + machine as nl.atlarge.opendc.integration.jpa.schema.Machine, + state.task as nl.atlarge.opendc.integration.jpa.schema.Task?, experiment, simulation.clock.now, - task.remaining.toInt(), - 1 + state.temperature, + state.memory, + state.load ) - manager.persist(state) + manager.persist(wrapped) } - manager.transaction.commit() + + trace.jobs.asSequence() + .flatMap { it.tasks.asSequence() } + .forEach { task -> + val state = InternalTaskState(0, + task as nl.atlarge.opendc.integration.jpa.schema.Task, + experiment, + simulation.clock.now, + task.remaining.toInt(), + 1 + ) + manager.persist(state) + } + } // Run next simulation cycle simulation.run(simulation.clock.now + 1) - experiment.last = simulation.clock.now } // Set the experiment state - manager.run { - transaction.begin() + manager.transaction { experiment.state = ExperimentState.FINISHED - transaction.commit() } logger.info { "Simulation done" } - manager.close() + val waiting: Long = tasks.fold(0.toLong()) { acc, task -> + val finished = task.state as TaskState.Finished + acc + (finished.previous.at - finished.previous.previous.at) + } / tasks.size + + val execution: Long = tasks.fold(0.toLong()) { acc, task -> + val finished = task.state as TaskState.Finished + acc + (finished.at - finished.previous.at) + } / tasks.size + + val turnaround: Long = tasks.fold(0.toLong()) { acc, task -> + val finished = task.state as TaskState.Finished + acc + (finished.at - finished.previous.previous.at) + } / tasks.size + + logger.info { "Average waiting time: $waiting seconds" } + logger.info { "Average execution time: $execution seconds" } + logger.info { "Average turnaround time: $turnaround seconds" } } + + /** + * Closes this resource, relinquishing any underlying resources. + * This method is invoked automatically on objects managed by the + * `try`-with-resources statement. + * + * @throws Exception if this resource cannot be closed + */ + override fun close() = manager.close() } diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt index ce69f84b..1d1e118d 100644 --- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt @@ -24,6 +24,7 @@ package nl.atlarge.opendc.platform +import nl.atlarge.opendc.integration.jpa.transaction import nl.atlarge.opendc.integration.jpa.schema.Experiment as InternalExperiment import nl.atlarge.opendc.integration.jpa.schema.ExperimentState import javax.persistence.EntityManager @@ -36,11 +37,11 @@ import javax.persistence.EntityManagerFactory * from. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class JpaExperimentManager(private val factory: EntityManagerFactory) { +class JpaExperimentManager(private val factory: EntityManagerFactory): AutoCloseable { /** * The entity manager for this experiment. */ - private val manager: EntityManager = factory.createEntityManager() + private var manager: EntityManager = factory.createEntityManager() /** * The amount of experiments in the queue. This property makes a call to the database and does therefore not @@ -60,21 +61,33 @@ class JpaExperimentManager(private val factory: EntityManagerFactory) { * @return The experiment that has been polled from the database or `null` if there are no experiments in the * queue. */ - fun poll(): Experiment<Unit>? { - manager.transaction.begin() - var experiment: InternalExperiment? = null - val results = manager.createQuery("SELECT e FROM experiments e WHERE e.state = :s", + fun poll(): JpaExperiment? { + var result: JpaExperiment? = null + manager.transaction { + var experiment: InternalExperiment? = null + val results = manager.createQuery("SELECT e FROM experiments e WHERE e.state = :s", InternalExperiment::class.java) - .setParameter("s", ExperimentState.QUEUED) - .setMaxResults(1) - .resultList + .setParameter("s", ExperimentState.QUEUED) + .setMaxResults(1) + .resultList - if (!results.isEmpty()) { - experiment = results.first() - experiment!!.state = ExperimentState.CLAIMED + if (!results.isEmpty()) { + experiment = results.first() + experiment!!.state = ExperimentState.CLAIMED + } + result = experiment?.let { JpaExperiment(manager, it) } } - manager.transaction.commit() - return experiment?.let { JpaExperiment(factory.createEntityManager(), it) } + manager = factory.createEntityManager() + return result } + + /** + * Close this resource, relinquishing any underlying resources. + * This method is invoked automatically on objects managed by the + * `try`-with-resources statement.* + * + * @throws Exception if this resource cannot be closed + */ + override fun close() = manager.close() } diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt index 6a1d839d..794ca44e 100644 --- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt @@ -45,17 +45,17 @@ fun main(args: Array<String>) { properties["javax.persistence.jdbc.password"] = env["PERSISTENCE_PASSWORD"] ?: "" val factory = Persistence.createEntityManagerFactory("opendc-simulator", properties) - val threads = 1 + val threads = 4 val executorService = Executors.newFixedThreadPool(threads) val experiments = JpaExperimentManager(factory) + val kernel = OmegaKernel logger.info { "Waiting for enqueued experiments..." } while (true) { - val experiment = experiments.poll() - executorService.submit { - experiment?.run { - logger.info { "Found experiment. Running simulating now..." } - run(OmegaKernel) + experiments.poll()?.let { experiment -> + logger.info { "Found experiment. Submitting for simulation now..." } + executorService.submit { + experiment.use { it.run(kernel) } } } |
