From d6d9d37abf17071ff050e45ea37c693e659a4e98 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 28 Sep 2017 03:27:36 +0200 Subject: Implement JPA integration --- .../nl/atlarge/opendc/platform/JpaExperiment.kt | 149 +++++++++++++++++++++ .../opendc/platform/JpaExperimentManager.kt | 73 ++++++++++ .../atlarge/opendc/platform/JpaPlatformRunner.kt | 52 +++++++ 3 files changed, 274 insertions(+) create mode 100644 opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt create mode 100644 opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt create mode 100644 opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt (limited to 'opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform') diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt new file mode 100644 index 00000000..4c284d1e --- /dev/null +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt @@ -0,0 +1,149 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.platform + +import mu.KotlinLogging +import nl.atlarge.opendc.integration.jpa.schema.ExperimentState +import nl.atlarge.opendc.integration.jpa.schema.MachineState +import nl.atlarge.opendc.integration.jpa.schema.TaskState +import nl.atlarge.opendc.integration.jpa.schema.Experiment as InternalExperiment +import nl.atlarge.opendc.integration.jpa.schema.Task as InternalTask +import nl.atlarge.opendc.kernel.Kernel +import nl.atlarge.opendc.topology.JpaTopologyFactory +import nl.atlarge.opendc.topology.container.Rack +import nl.atlarge.opendc.topology.container.Room +import nl.atlarge.opendc.topology.destinations +import nl.atlarge.opendc.topology.machine.Machine +import javax.persistence.EntityManager + +/** + * An [Experiment] backed by the JPA API and an underlying database connection. + * + * @property manager The entity manager for the database connection. + * @property experiment The internal experiment definition to use. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +class JpaExperiment(val manager: EntityManager, + private val experiment: InternalExperiment): Experiment { + /** + * The logging instance. + */ + private val logger = KotlinLogging.logger {} + + /** + * Run the experiment on the specified simulation [Kernel]. + * + * @param kernel The simulation kernel to run the experiment. + * @throws IllegalStateException if the simulation is already running or finished. + */ + override fun run(kernel: Kernel) { + if (experiment.state != ExperimentState.CLAIMED) { + throw IllegalStateException("The experiment is in illegal state ${experiment.state}") + } + + // Set the simulation state + manager.run { + transaction.begin() + experiment.state = ExperimentState.SIMULATING + transaction.commit() + } + val section = experiment.path.sections.first() + + // Important: initialise the scheduler of the datacenter + section.datacenter.scheduler = experiment.scheduler + + val topology = JpaTopologyFactory(section).create() + val simulation = kernel.create(topology) + val trace = experiment.trace + + logger.info { "Sending trace to kernel" } + + // Schedule all messages in the trace + trace.jobs.forEach { job -> + job.tasks.forEach { task -> + if (task is InternalTask) { + task.reset() + simulation.schedule(task, section.datacenter, delay = task.startTime) + } else { + logger.warn { "Dropped invalid task $task" } + } + } + } + + // Find all machines in the datacenter + val machines = topology.run { + section.datacenter.outgoingEdges.destinations("room").asSequence() + .flatMap { it.outgoingEdges.destinations("rack").asSequence() } + .flatMap { it.outgoingEdges.destinations("machine").asSequence() }.toList() + } + + logger.info { "Starting simulation" } + + while (trace.jobs.any { !it.finished }) { + simulation.run(simulation.clock.now + 1) + + // Collect data of simulation cycle + manager.transaction.begin() + machines.forEach { machine -> + val state = simulation.run { machine.state } + val wrapped = MachineState(0, + machine as nl.atlarge.opendc.integration.jpa.schema.Machine, + state.task as nl.atlarge.opendc.integration.jpa.schema.Task?, + experiment, + simulation.clock.now, + state.temperature, + state.memory, + state.load + ) + manager.persist(wrapped) + } + + trace.jobs.asSequence() + .flatMap { it.tasks.asSequence() } + .forEach { task -> + val state = TaskState(0, + task as nl.atlarge.opendc.integration.jpa.schema.Task, + experiment, + simulation.clock.now, + task.remaining.toInt(), + 1 + ) + manager.persist(state) + } + manager.transaction.commit() + + experiment.last = simulation.clock.now + } + + // Set the experiment state + manager.run { + transaction.begin() + experiment.state = ExperimentState.FINISHED + transaction.commit() + } + + logger.info { "Simulation done" } + } +} diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt new file mode 100644 index 00000000..faa1a771 --- /dev/null +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt @@ -0,0 +1,73 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.platform + +import nl.atlarge.opendc.integration.jpa.schema.Experiment as InternalExperiment +import nl.atlarge.opendc.integration.jpa.schema.ExperimentState +import javax.persistence.EntityManager + +/** + * A manager for [Experiment]s received from a JPA database. + * + * @property manager The JPA entity manager to retrieve entities from the database from. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +class JpaExperimentManager(private val manager: EntityManager) { + /** + * The amount of experiments in the queue. This property makes a call to the database and does therefore not + * run in O(1) time. + */ + val size: Int + get() { + return manager.createQuery("SELECT COUNT(e.id) FROM experiments e WHERE e.state = :s", + java.lang.Long::class.java) + .setParameter("s", ExperimentState.QUEUED) + .singleResult.toInt() + } + + /** + * Poll an [Experiment] from the database and claim it. + * + * @return The experiment that has been polled from the database or `null` if there are no experiments in the + * queue. + */ + fun poll(): Experiment? { + manager.transaction.begin() + var experiment: InternalExperiment? = null + val results = manager.createQuery("SELECT e FROM experiments e WHERE e.state = :s", + InternalExperiment::class.java) + .setParameter("s", ExperimentState.QUEUED) + .setMaxResults(1) + .resultList + + + if (!results.isEmpty()) { + experiment = results.first() + experiment!!.state = ExperimentState.CLAIMED + } + manager.transaction.commit() + return experiment?.let { JpaExperiment(manager, it) } + } +} diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt new file mode 100644 index 00000000..f95eb174 --- /dev/null +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.platform + +import mu.KotlinLogging +import nl.atlarge.opendc.kernel.omega.OmegaKernel +import javax.persistence.Persistence + +val logger = KotlinLogging.logger {} + +/** + * The main entry point of the program. This program polls experiments from a database and runs the + * simulation and reports the results back to the database. + * + * @param args The command line arguments of the program. + */ +fun main(args: Array) { + val factory = Persistence.createEntityManagerFactory("opendc-frontend") + val manager = factory.createEntityManager() + val experiments = JpaExperimentManager(manager) + + logger.info { "Waiting for enqueued experiments..." } + while (true) { + val experiment = experiments.poll() + experiment?.run { + logger.info { "Found experiment. Running simulating now..." } + run(OmegaKernel) + } + } +} -- cgit v1.2.3