summaryrefslogtreecommitdiff
path: root/opendc-model-odc
diff options
context:
space:
mode:
authorFabian Mastenbroek <fabianishere@outlook.com>2018-07-11 14:33:12 +0200
committerGitHub <noreply@github.com>2018-07-11 14:33:12 +0200
commitbc814ab5b5a4becf3dbc5f796a165955c0305d70 (patch)
tree516d547bd03b6c95f9d640a2460d67bcf711895a /opendc-model-odc
parent07f245dcf4b01ade251d0f4bedc897d7145b04d1 (diff)
feat: Interpolate machine state and task progress (#21)
This pull request implements interpolation of task progress (represented as the `TaskState` and `MachineState` class) via the Interpolation helpers implemented in #20. The model assumes that tasks progress linearly between two measurements (since the time between measurements is usually small).
Diffstat (limited to 'opendc-model-odc')
-rw-r--r--opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt1
-rw-r--r--opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/Jpa.kt47
-rw-r--r--opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/ExperimentState.kt5
-rw-r--r--opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/MachineState.kt27
-rw-r--r--opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/TaskState.kt25
-rw-r--r--opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt151
6 files changed, 217 insertions, 39 deletions
diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt
index 0a0c792c..9cb37dc9 100644
--- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt
+++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt
@@ -41,7 +41,6 @@ class JpaBootstrap(val experiment: Experiment) : Bootstrap<JpaModel> {
// Schedule all messages in the trace
tasks.forEach { task ->
if (task is Task) {
- logger.info { "Scheduling $task" }
context.schedule(task, section.datacenter, delay = task.startTime)
}
}
diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/Jpa.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/Jpa.kt
index cb3181f8..3a9805d5 100644
--- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/Jpa.kt
+++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/Jpa.kt
@@ -24,7 +24,11 @@
package com.atlarge.opendc.model.odc.integration.jpa
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
+import kotlinx.coroutines.experimental.channels.consume
import javax.persistence.EntityManager
+import javax.persistence.EntityManagerFactory
+import javax.persistence.RollbackException
/**
* Run the given block in a transaction, committing on return of the block.
@@ -36,3 +40,46 @@ inline fun EntityManager.transaction(block: () -> Unit) {
block()
transaction.commit()
}
+
+/**
+ * Write the given channel in batch to the database.
+ *
+ * @param factory The [EntityManagerFactory] to use to create an [EntityManager] which can persist the entities.
+ * @param batchSize The size of each batch.
+ */
+suspend fun <E> ReceiveChannel<E>.persist(factory: EntityManagerFactory, batchSize: Int = 1000) {
+ val writer = factory.createEntityManager()
+
+ this.consume {
+ val transaction = writer.transaction
+ var counter = 0
+ try {
+ transaction.begin()
+
+ for (element in this) {
+ // Commit batch every batch size
+ if (counter > 0 && counter % batchSize == 0) {
+ writer.flush()
+ writer.clear()
+
+ transaction.commit()
+ transaction.begin()
+ }
+
+ writer.persist(element)
+ counter++
+ }
+
+ transaction.commit()
+ } catch(e: RollbackException) {
+ // Rollback transaction if still active
+ if (transaction.isActive) {
+ transaction.rollback()
+ }
+
+ throw e
+ } finally {
+ writer.close()
+ }
+ }
+}
diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/ExperimentState.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/ExperimentState.kt
index fecfe060..dc7355bd 100644
--- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/ExperimentState.kt
+++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/ExperimentState.kt
@@ -50,4 +50,9 @@ enum class ExperimentState {
* This state indicates the experiment has finished simulating.
*/
FINISHED,
+
+ /**
+ * This states indicates the experiment was aborted due to a timeout.
+ */
+ ABORTED,
}
diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/MachineState.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/MachineState.kt
index f3149adb..18c8d5cc 100644
--- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/MachineState.kt
+++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/MachineState.kt
@@ -25,7 +25,13 @@
package com.atlarge.opendc.model.odc.integration.jpa.schema
import com.atlarge.opendc.simulator.Instant
+import com.atlarge.opendc.simulator.instrumentation.interpolate
+import com.atlarge.opendc.simulator.instrumentation.lerp
+import kotlinx.coroutines.experimental.Unconfined
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
+import kotlinx.coroutines.experimental.channels.consume
import javax.persistence.Entity
+import kotlin.coroutines.experimental.CoroutineContext
/**
* The state of a [Machine].
@@ -51,3 +57,24 @@ data class MachineState(
val memoryUsage: Int,
val load: Double
)
+
+/**
+ * Linearly interpolate [n] amount of elements between every two occurrences of task progress measurements represented
+ * as [MachineState] instances passing through the channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ *
+ * @param context The context of the coroutine.
+ * @param n The amount of elements to interpolate between the actual elements in the channel.
+ */
+fun ReceiveChannel<MachineState>.interpolate(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<MachineState> =
+ interpolate(n, context) { f, a, b ->
+ a.copy(
+ id = 0,
+ time = lerp(a.time, b.time, f),
+ temperature = lerp(a.temperature, b.temperature, f),
+ memoryUsage = lerp(a.memoryUsage, b.memoryUsage, f),
+ load = lerp(a.load, b.load, f)
+ )
+ }
diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/TaskState.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/TaskState.kt
index 6ab43b93..b010229d 100644
--- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/TaskState.kt
+++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/integration/jpa/schema/TaskState.kt
@@ -25,7 +25,13 @@
package com.atlarge.opendc.model.odc.integration.jpa.schema
import com.atlarge.opendc.simulator.Instant
+import com.atlarge.opendc.simulator.instrumentation.interpolate
+import com.atlarge.opendc.simulator.instrumentation.lerp
+import kotlinx.coroutines.experimental.Unconfined
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
+import kotlinx.coroutines.experimental.channels.consume
import javax.persistence.Entity
+import kotlin.coroutines.experimental.CoroutineContext
/**
* The state of a [Task].
@@ -47,3 +53,22 @@ data class TaskState(
val remaining: Int,
val cores: Int
)
+
+/**
+ * Linearly interpolate [n] amount of elements between every two occurrences of task progress measurements represented
+ * as [TaskState] instances passing through the channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ *
+ * @param context The context of the coroutine.
+ * @param n The amount of elements to interpolate between the actual elements in the channel.
+ */
+fun ReceiveChannel<TaskState>.interpolate(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<TaskState> =
+ interpolate(n, context) { f, a, b ->
+ a.copy(
+ id = 0,
+ time = lerp(a.time, b.time, f),
+ remaining = lerp(a.remaining, b.remaining, f)
+ )
+ }
diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt
index 4e88b3d4..70b9bd48 100644
--- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt
+++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt
@@ -25,20 +25,31 @@
package com.atlarge.opendc.model.odc.platform
import com.atlarge.opendc.model.odc.JpaBootstrap
+import com.atlarge.opendc.model.odc.JpaModel
+import com.atlarge.opendc.model.odc.integration.jpa.persist
import com.atlarge.opendc.model.odc.integration.jpa.schema.ExperimentState
+import com.atlarge.opendc.model.odc.integration.jpa.schema.interpolate
import com.atlarge.opendc.model.odc.integration.jpa.schema.MachineState
import com.atlarge.opendc.model.odc.integration.jpa.transaction
+import com.atlarge.opendc.model.odc.platform.workload.Task
import com.atlarge.opendc.model.odc.platform.workload.TaskState
import com.atlarge.opendc.model.odc.topology.container.Rack
import com.atlarge.opendc.model.odc.topology.container.Room
import com.atlarge.opendc.model.odc.topology.machine.Machine
import com.atlarge.opendc.model.topology.destinations
import com.atlarge.opendc.simulator.Duration
+import com.atlarge.opendc.simulator.instrumentation.*
import com.atlarge.opendc.simulator.kernel.Kernel
import com.atlarge.opendc.simulator.platform.Experiment
+import kotlinx.coroutines.experimental.channels.Channel
+import kotlinx.coroutines.experimental.channels.asReceiveChannel
+import kotlinx.coroutines.experimental.channels.map
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.runBlocking
import mu.KotlinLogging
import java.io.Closeable
import javax.persistence.EntityManager
+import kotlin.system.measureTimeMillis
import com.atlarge.opendc.model.odc.integration.jpa.schema.Experiment as InternalExperiment
import com.atlarge.opendc.model.odc.integration.jpa.schema.Task as InternalTask
import com.atlarge.opendc.model.odc.integration.jpa.schema.TaskState as InternalTaskState
@@ -70,6 +81,8 @@ class JpaExperiment(private val manager: EntityManager,
throw IllegalStateException("The experiment is in illegal state ${experiment.state}")
}
+ logger.info { "Initialising experiment ${experiment.id}" }
+
// Set the simulation state
manager.transaction {
experiment.state = ExperimentState.SIMULATING
@@ -77,66 +90,128 @@ class JpaExperiment(private val manager: EntityManager,
val bootstrap = JpaBootstrap(experiment)
val simulation = factory.create(bootstrap)
- val topology = simulation.model
val section = experiment.path.sections.first()
val trace = experiment.trace
val tasks = trace.jobs.flatMap { it.tasks }
+ // The port we use to install the instruments
+ val port = simulation.openPort()
+
// Find all machines in the datacenter
- val machines = topology.run {
+ val machines = simulation.model.run {
section.datacenter.outgoingEdges.destinations<Room>("room").asSequence()
.flatMap { it.outgoingEdges.destinations<Rack>("rack").asSequence() }
.flatMap { it.outgoingEdges.destinations<Machine>("machine").asSequence() }.toList()
}
- logger.info { "Starting simulation" }
+ // The instrument used for monitoring machines
+ fun machine(machine: Machine): Instrument<MachineState, JpaModel> = {
+ while (true) {
+ send(MachineState(
+ 0,
+ machine as com.atlarge.opendc.model.odc.integration.jpa.schema.Machine,
+ machine.state.task as InternalTask?,
+ experiment,
+ time,
+ machine.state.temperature,
+ machine.state.memory,
+ machine.state.load
+ ))
+ hold(10)
+ }
+ }
- while (trace.jobs.any { !it.finished }) {
- // If we have reached a timeout, return
- if (simulation.time >= timeout)
- return null
-
- // Collect data of simulation cycle
- manager.transaction {
- experiment.last = simulation.time
-
- machines.forEach { machine ->
- val state = simulation.run { machine.state }
- val wrapped = MachineState(0,
- machine as com.atlarge.opendc.model.odc.integration.jpa.schema.Machine,
- state.task as com.atlarge.opendc.model.odc.integration.jpa.schema.Task?,
- experiment,
- simulation.time,
- state.temperature,
- state.memory,
- state.load
- )
- manager.persist(wrapped)
+ // The stream of machine state measurements
+ val machineStates = machines
+ .asReceiveChannel()
+ .map { machine(it) }
+ .flatMapMerge { port.install(Channel.UNLIMITED, it).interpolate(9) }
+
+ // The instrument used for monitoring tasks
+ fun task(task: Task): Instrument<InternalTaskState, JpaModel> = {
+ while (true) {
+ send(InternalTaskState(
+ 0,
+ task as InternalTask,
+ experiment,
+ time,
+ task.remaining.toInt(),
+ 1
+ ))
+
+ hold(10)
+ }
+ }
+
+ // The stream of task state measurements
+ val taskStates = tasks
+ .asReceiveChannel()
+ .map { task(it) }
+ .flatMapMerge { port.install(it).interpolate(9) }
+
+ // A job which writes the data to database in a separate thread
+ val writer = launch {
+ taskStates.merge(coroutineContext, machineStates)
+ .persist(manager.entityManagerFactory)
+ }
+
+ // A method to flush the remaining measurements to the database
+ fun finalize() = runBlocking {
+ logger.info { "Flushing remaining measurements to database" }
+
+ // Stop gathering new measurements
+ port.close()
+
+ // Wait for writer thread to finish
+ writer.join()
+ }
+
+ logger.info { "Starting simulation" }
+ logger.info { "Scheduling total of ${trace.jobs.size} jobs and ${tasks.size} tasks" }
+
+ val measurement = measureTimeMillis {
+ while (true) {
+ // Have all jobs finished yet
+ if (trace.jobs.all { it.finished })
+ break
+
+ // If we have reached a timeout, return
+ if (simulation.time >= timeout) {
+ // Flush remaining data
+ finalize()
+
+ // Mark the experiment as aborted
+ manager.transaction {
+ experiment.last = simulation.time
+ experiment.state = ExperimentState.ABORTED
+ }
+
+ logger.warn { "Experiment aborted due to timeout" }
+ return null
}
- tasks.forEach { task ->
- val state = InternalTaskState(0,
- task as com.atlarge.opendc.model.odc.integration.jpa.schema.Task,
- experiment,
- simulation.time,
- task.remaining.toInt(),
- 1
- )
- manager.persist(state)
+ try {
+ // Run next simulation cycle
+ simulation.step()
+ } catch (e: Throwable) {
+ logger.error(e) { "An error occurred during execution of the experiment" }
}
}
-
- // Run next simulation cycle
- simulation.step()
}
- // Set the experiment state
+ logger.info { "Simulation done in $measurement milliseconds" }
+
+ // Flush remaining data to database
+ finalize()
+
+ // Mark experiment as finished
manager.transaction {
+ experiment.last = simulation.time
experiment.state = ExperimentState.FINISHED
}
- logger.info { "Simulation done" }
+ logger.info { "Computing statistics" }
val waiting: Long = tasks.fold(0.toLong()) { acc, task ->
val finished = task.state as TaskState.Finished
acc + (finished.previous.at - finished.previous.previous.at)