From bc814ab5b5a4becf3dbc5f796a165955c0305d70 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 11 Jul 2018 14:33:12 +0200 Subject: feat: Interpolate machine state and task progress (#21) This pull request implements interpolation of task progress (represented as the `TaskState` and `MachineState` class) via the Interpolation helpers implemented in #20. The model assumes that tasks progress linearly between two measurements (since the time between measurements is usually small). --- .../com/atlarge/opendc/model/odc/JpaBootstrap.kt | 1 - .../opendc/model/odc/integration/jpa/Jpa.kt | 47 +++++++ .../odc/integration/jpa/schema/ExperimentState.kt | 5 + .../odc/integration/jpa/schema/MachineState.kt | 27 ++++ .../model/odc/integration/jpa/schema/TaskState.kt | 25 ++++ .../opendc/model/odc/platform/JpaExperiment.kt | 151 +++++++++++++++------ .../opendc/simulator/instrumentation/Helpers.kt | 48 +++++++ .../simulator/instrumentation/Interpolation.kt | 12 +- 8 files changed, 275 insertions(+), 41 deletions(-) create mode 100644 opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Helpers.kt diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt index 0a0c792c..9cb37dc9 100644 --- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt +++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt @@ -41,7 +41,6 @@ class JpaBootstrap(val experiment: Experiment) : Bootstrap { // Schedule all messages in the trace tasks.forEach { task -> if (task is Task) { - logger.info { "Scheduling $task" } context.schedule(task, section.datacenter, delay = task.startTime) } } diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/Jpa.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/Jpa.kt index cb3181f8..3a9805d5 100644 --- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/Jpa.kt +++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/Jpa.kt @@ -24,7 +24,11 @@ package com.atlarge.opendc.model.odc.integration.jpa +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.consume import javax.persistence.EntityManager +import javax.persistence.EntityManagerFactory +import javax.persistence.RollbackException /** * Run the given block in a transaction, committing on return of the block. @@ -36,3 +40,46 @@ inline fun EntityManager.transaction(block: () -> Unit) { block() transaction.commit() } + +/** + * Write the given channel in batch to the database. + * + * @param factory The [EntityManagerFactory] to use to create an [EntityManager] which can persist the entities. + * @param batchSize The size of each batch. + */ +suspend fun ReceiveChannel.persist(factory: EntityManagerFactory, batchSize: Int = 1000) { + val writer = factory.createEntityManager() + + this.consume { + val transaction = writer.transaction + var counter = 0 + try { + transaction.begin() + + for (element in this) { + // Commit batch every batch size + if (counter > 0 && counter % batchSize == 0) { + writer.flush() + writer.clear() + + transaction.commit() + transaction.begin() + } + + writer.persist(element) + counter++ + } + + transaction.commit() + } catch(e: RollbackException) { + // Rollback transaction if still active + if (transaction.isActive) { + transaction.rollback() + } + + throw e + } finally { + writer.close() + } + } +} diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/ExperimentState.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/ExperimentState.kt index fecfe060..dc7355bd 100644 --- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/ExperimentState.kt +++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/ExperimentState.kt @@ -50,4 +50,9 @@ enum class ExperimentState { * This state indicates the experiment has finished simulating. */ FINISHED, + + /** + * This states indicates the experiment was aborted due to a timeout. + */ + ABORTED, } diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/MachineState.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/MachineState.kt index f3149adb..18c8d5cc 100644 --- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/MachineState.kt +++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/MachineState.kt @@ -25,7 +25,13 @@ package com.atlarge.opendc.model.odc.integration.jpa.schema import com.atlarge.opendc.simulator.Instant +import com.atlarge.opendc.simulator.instrumentation.interpolate +import com.atlarge.opendc.simulator.instrumentation.lerp +import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.consume import javax.persistence.Entity +import kotlin.coroutines.experimental.CoroutineContext /** * The state of a [Machine]. @@ -51,3 +57,24 @@ data class MachineState( val memoryUsage: Int, val load: Double ) + +/** + * Linearly interpolate [n] amount of elements between every two occurrences of task progress measurements represented + * as [MachineState] instances passing through the channel. + * + * The operation is _intermediate_ and _stateless_. + * This function [consumes][consume] all elements of the original [ReceiveChannel]. + * + * @param context The context of the coroutine. + * @param n The amount of elements to interpolate between the actual elements in the channel. + */ +fun ReceiveChannel.interpolate(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel = + interpolate(n, context) { f, a, b -> + a.copy( + id = 0, + time = lerp(a.time, b.time, f), + temperature = lerp(a.temperature, b.temperature, f), + memoryUsage = lerp(a.memoryUsage, b.memoryUsage, f), + load = lerp(a.load, b.load, f) + ) + } diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/TaskState.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/TaskState.kt index 6ab43b93..b010229d 100644 --- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/TaskState.kt +++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/TaskState.kt @@ -25,7 +25,13 @@ package com.atlarge.opendc.model.odc.integration.jpa.schema import com.atlarge.opendc.simulator.Instant +import com.atlarge.opendc.simulator.instrumentation.interpolate +import com.atlarge.opendc.simulator.instrumentation.lerp +import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.consume import javax.persistence.Entity +import kotlin.coroutines.experimental.CoroutineContext /** * The state of a [Task]. @@ -47,3 +53,22 @@ data class TaskState( val remaining: Int, val cores: Int ) + +/** + * Linearly interpolate [n] amount of elements between every two occurrences of task progress measurements represented + * as [TaskState] instances passing through the channel. + * + * The operation is _intermediate_ and _stateless_. + * This function [consumes][consume] all elements of the original [ReceiveChannel]. + * + * @param context The context of the coroutine. + * @param n The amount of elements to interpolate between the actual elements in the channel. + */ +fun ReceiveChannel.interpolate(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel = + interpolate(n, context) { f, a, b -> + a.copy( + id = 0, + time = lerp(a.time, b.time, f), + remaining = lerp(a.remaining, b.remaining, f) + ) + } diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt index 4e88b3d4..70b9bd48 100644 --- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt +++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt @@ -25,20 +25,31 @@ package com.atlarge.opendc.model.odc.platform import com.atlarge.opendc.model.odc.JpaBootstrap +import com.atlarge.opendc.model.odc.JpaModel +import com.atlarge.opendc.model.odc.integration.jpa.persist import com.atlarge.opendc.model.odc.integration.jpa.schema.ExperimentState +import com.atlarge.opendc.model.odc.integration.jpa.schema.interpolate import com.atlarge.opendc.model.odc.integration.jpa.schema.MachineState import com.atlarge.opendc.model.odc.integration.jpa.transaction +import com.atlarge.opendc.model.odc.platform.workload.Task import com.atlarge.opendc.model.odc.platform.workload.TaskState import com.atlarge.opendc.model.odc.topology.container.Rack import com.atlarge.opendc.model.odc.topology.container.Room import com.atlarge.opendc.model.odc.topology.machine.Machine import com.atlarge.opendc.model.topology.destinations import com.atlarge.opendc.simulator.Duration +import com.atlarge.opendc.simulator.instrumentation.* import com.atlarge.opendc.simulator.kernel.Kernel import com.atlarge.opendc.simulator.platform.Experiment +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.asReceiveChannel +import kotlinx.coroutines.experimental.channels.map +import kotlinx.coroutines.experimental.launch +import kotlinx.coroutines.experimental.runBlocking import mu.KotlinLogging import java.io.Closeable import javax.persistence.EntityManager +import kotlin.system.measureTimeMillis import com.atlarge.opendc.model.odc.integration.jpa.schema.Experiment as InternalExperiment import com.atlarge.opendc.model.odc.integration.jpa.schema.Task as InternalTask import com.atlarge.opendc.model.odc.integration.jpa.schema.TaskState as InternalTaskState @@ -70,6 +81,8 @@ class JpaExperiment(private val manager: EntityManager, throw IllegalStateException("The experiment is in illegal state ${experiment.state}") } + logger.info { "Initialising experiment ${experiment.id}" } + // Set the simulation state manager.transaction { experiment.state = ExperimentState.SIMULATING @@ -77,66 +90,128 @@ class JpaExperiment(private val manager: EntityManager, val bootstrap = JpaBootstrap(experiment) val simulation = factory.create(bootstrap) - val topology = simulation.model val section = experiment.path.sections.first() val trace = experiment.trace val tasks = trace.jobs.flatMap { it.tasks } + // The port we use to install the instruments + val port = simulation.openPort() + // Find all machines in the datacenter - val machines = topology.run { + val machines = simulation.model.run { section.datacenter.outgoingEdges.destinations("room").asSequence() .flatMap { it.outgoingEdges.destinations("rack").asSequence() } .flatMap { it.outgoingEdges.destinations("machine").asSequence() }.toList() } - logger.info { "Starting simulation" } + // The instrument used for monitoring machines + fun machine(machine: Machine): Instrument = { + while (true) { + send(MachineState( + 0, + machine as com.atlarge.opendc.model.odc.integration.jpa.schema.Machine, + machine.state.task as InternalTask?, + experiment, + time, + machine.state.temperature, + machine.state.memory, + machine.state.load + )) + hold(10) + } + } - while (trace.jobs.any { !it.finished }) { - // If we have reached a timeout, return - if (simulation.time >= timeout) - return null - - // Collect data of simulation cycle - manager.transaction { - experiment.last = simulation.time - - machines.forEach { machine -> - val state = simulation.run { machine.state } - val wrapped = MachineState(0, - machine as com.atlarge.opendc.model.odc.integration.jpa.schema.Machine, - state.task as com.atlarge.opendc.model.odc.integration.jpa.schema.Task?, - experiment, - simulation.time, - state.temperature, - state.memory, - state.load - ) - manager.persist(wrapped) + // The stream of machine state measurements + val machineStates = machines + .asReceiveChannel() + .map { machine(it) } + .flatMapMerge { port.install(Channel.UNLIMITED, it).interpolate(9) } + + // The instrument used for monitoring tasks + fun task(task: Task): Instrument = { + while (true) { + send(InternalTaskState( + 0, + task as InternalTask, + experiment, + time, + task.remaining.toInt(), + 1 + )) + + hold(10) + } + } + + // The stream of task state measurements + val taskStates = tasks + .asReceiveChannel() + .map { task(it) } + .flatMapMerge { port.install(it).interpolate(9) } + + // A job which writes the data to database in a separate thread + val writer = launch { + taskStates.merge(coroutineContext, machineStates) + .persist(manager.entityManagerFactory) + } + + // A method to flush the remaining measurements to the database + fun finalize() = runBlocking { + logger.info { "Flushing remaining measurements to database" } + + // Stop gathering new measurements + port.close() + + // Wait for writer thread to finish + writer.join() + } + + logger.info { "Starting simulation" } + logger.info { "Scheduling total of ${trace.jobs.size} jobs and ${tasks.size} tasks" } + + val measurement = measureTimeMillis { + while (true) { + // Have all jobs finished yet + if (trace.jobs.all { it.finished }) + break + + // If we have reached a timeout, return + if (simulation.time >= timeout) { + // Flush remaining data + finalize() + + // Mark the experiment as aborted + manager.transaction { + experiment.last = simulation.time + experiment.state = ExperimentState.ABORTED + } + + logger.warn { "Experiment aborted due to timeout" } + return null } - tasks.forEach { task -> - val state = InternalTaskState(0, - task as com.atlarge.opendc.model.odc.integration.jpa.schema.Task, - experiment, - simulation.time, - task.remaining.toInt(), - 1 - ) - manager.persist(state) + try { + // Run next simulation cycle + simulation.step() + } catch (e: Throwable) { + logger.error(e) { "An error occurred during execution of the experiment" } } } - - // Run next simulation cycle - simulation.step() } - // Set the experiment state + logger.info { "Simulation done in $measurement milliseconds" } + + // Flush remaining data to database + finalize() + + // Mark experiment as finished manager.transaction { + experiment.last = simulation.time experiment.state = ExperimentState.FINISHED } - logger.info { "Simulation done" } + logger.info { "Computing statistics" } val waiting: Long = tasks.fold(0.toLong()) { acc, task -> val finished = task.state as TaskState.Finished acc + (finished.previous.at - finished.previous.previous.at) diff --git a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Helpers.kt b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Helpers.kt new file mode 100644 index 00000000..d6cf2e3a --- /dev/null +++ b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Helpers.kt @@ -0,0 +1,48 @@ +package com.atlarge.opendc.simulator.instrumentation + +import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.consumeEach +import kotlinx.coroutines.experimental.channels.produce +import kotlinx.coroutines.experimental.channels.toChannel +import kotlinx.coroutines.experimental.launch +import kotlin.coroutines.experimental.CoroutineContext + +/** + * Transform each element in the channel into a [ReceiveChannel] of output elements that is then flattened into the + * output stream by emitting elements from the channels as they become available. + * + * @param context The [CoroutineContext] to run the operation in. + * @param transform The function to transform the elements into channels. + * @return The flattened [ReceiveChannel] of merged elements. + */ +fun ReceiveChannel.flatMapMerge(context: CoroutineContext = Unconfined, + transform: suspend (E) -> ReceiveChannel): ReceiveChannel = + produce(context) { + val job = launch(Unconfined) { + consumeEach { + launch(coroutineContext) { + transform(it).toChannel(this@produce) + } + } + } + job.join() + } + +/** + * Merge this channel with the other channel into an output stream by emitting elements from the channels as they + * become available. + * + * @param context The [CoroutineContext] to run the operation in. + * @param other The other channel to merge with. + * @return The [ReceiveChannel] of merged elements. + */ +fun ReceiveChannel.merge(context: CoroutineContext = Unconfined, + other: ReceiveChannel): ReceiveChannel = + produce(context) { + val job = launch(Unconfined) { + launch(coroutineContext) { toChannel(this@produce) } + launch(coroutineContext) { other.toChannel(this@produce) } + } + job.join() + } diff --git a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt index 439813e8..5a033ff3 100644 --- a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt +++ b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt @@ -41,8 +41,15 @@ import kotlin.coroutines.experimental.CoroutineContext * @param interpolator A function to interpolate between the two element occurrences. */ fun ReceiveChannel.interpolate(n: Int, context: CoroutineContext = Unconfined, - interpolator: (Double, E, E) -> E): ReceiveChannel = - produce(context) { + interpolator: (Double, E, E) -> E): ReceiveChannel { + require(n >= 0) { "The amount to interpolate must be non-negative" } + + // If we do not want to interpolate any elements, just return the original channel + if (n == 0) { + return this + } + + return produce(context) { consume { val iterator = iterator() @@ -62,6 +69,7 @@ fun ReceiveChannel.interpolate(n: Int, context: CoroutineContext = Unconf } } } +} /** * Perform a linear interpolation on the given double values. -- cgit v1.2.3