diff options
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
| -rw-r--r-- | opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt | 108 |
1 files changed, 57 insertions, 51 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt index 5c5e6ceb..1b91e843 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt @@ -33,17 +33,16 @@ import kotlinx.coroutines.flow.first import mu.KotlinLogging import java.sql.Connection import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.atomic.AtomicBoolean import kotlin.concurrent.thread private val logger = KotlinLogging.logger {} class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20Reporter { private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() - private val queue = ArrayBlockingQueue<Report>(2048) - private val stop = AtomicBoolean(false) + private val queue = ArrayBlockingQueue<Action>(2048) private val writerThread = thread(start = true, name = "sc20-writer") { val stmt = try { + conn.autoCommit = false conn.prepareStatement( """ INSERT INTO host_reports (experiment_id, time, duration, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, image_count, server, host_state, host_usage, power_draw, total_submitted_vms, total_queued_vms, total_running_vms, total_finished_vms) @@ -59,36 +58,40 @@ class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20R var batch = 0 try { - while (!stop.get()) { - val record = queue.take() - stmt.setLong(1, experimentId) - stmt.setLong(2, record.time) - stmt.setLong(3, record.duration) - stmt.setLong(4, record.requestedBurst) - stmt.setLong(5, record.grantedBurst) - stmt.setLong(6, record.overcommissionedBurst) - stmt.setLong(7, record.interferedBurst) - stmt.setDouble(8, record.cpuUsage) - stmt.setDouble(9, record.cpuDemand) - stmt.setInt(10, record.numberOfDeployedImages) - stmt.setString(11, record.hostServer.uid.toString()) - stmt.setString(12, record.hostServer.state.name) - stmt.setDouble(13, record.hostUsage) - stmt.setDouble(14, record.powerDraw) - stmt.setLong(15, record.submittedVms) - stmt.setLong(16, record.queuedVms) - stmt.setLong(17, record.runningVms) - stmt.setLong(18, record.finishedVms) - stmt.addBatch() - batch++ - - if (batch > batchSize) { - stmt.executeBatch() - batch = 0 + loop@ while (true) { + when (val record = queue.take()) { + is Action.Stop -> break@loop + is Action.Write -> { + stmt.setLong(1, experimentId) + stmt.setLong(2, record.time) + stmt.setLong(3, record.duration) + stmt.setLong(4, record.requestedBurst) + stmt.setLong(5, record.grantedBurst) + stmt.setLong(6, record.overcommissionedBurst) + stmt.setLong(7, record.interferedBurst) + stmt.setDouble(8, record.cpuUsage) + stmt.setDouble(9, record.cpuDemand) + stmt.setInt(10, record.numberOfDeployedImages) + stmt.setString(11, record.hostServer.uid.toString()) + stmt.setString(12, record.hostServer.state.name) + stmt.setDouble(13, record.hostUsage) + stmt.setDouble(14, record.powerDraw) + stmt.setLong(15, record.submittedVms) + stmt.setLong(16, record.queuedVms) + stmt.setLong(17, record.runningVms) + stmt.setLong(18, record.finishedVms) + stmt.addBatch() + batch++ + + if (batch % batchSize == 0) { + stmt.executeBatch() + conn.commit() + } + } } } } finally { - stmt.executeBatch() + conn.commit() stmt.close() conn.close() } @@ -152,7 +155,7 @@ class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20R val powerDraw = driver.powerDraw.first() queue.put( - Report( + Action.Write( time, duration, requestedBurst, @@ -174,27 +177,30 @@ class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20R } override fun close() { - // Busy loop to wait for writer thread to finish - stop.set(true) + queue.put(Action.Stop) writerThread.join() } - data class Report( - val time: Long, - val duration: Long, - val requestedBurst: Long, - val grantedBurst: Long, - val overcommissionedBurst: Long, - val interferedBurst: Long, - val cpuUsage: Double, - val cpuDemand: Double, - val numberOfDeployedImages: Int, - val hostServer: Server, - val hostUsage: Double, - val powerDraw: Double, - val submittedVms: Long, - val queuedVms: Long, - val runningVms: Long, - val finishedVms: Long - ) + private sealed class Action { + object Stop : Action() + + data class Write( + val time: Long, + val duration: Long, + val requestedBurst: Long, + val grantedBurst: Long, + val overcommissionedBurst: Long, + val interferedBurst: Long, + val cpuUsage: Double, + val cpuDemand: Double, + val numberOfDeployedImages: Int, + val hostServer: Server, + val hostUsage: Double, + val powerDraw: Double, + val submittedVms: Long, + val queuedVms: Long, + val runningVms: Long, + val finishedVms: Long + ) : Action() + } } |
