summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-sc20/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt108
1 files changed, 57 insertions, 51 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt
index 5c5e6ceb..1b91e843 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt
@@ -33,17 +33,16 @@ import kotlinx.coroutines.flow.first
import mu.KotlinLogging
import java.sql.Connection
import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.atomic.AtomicBoolean
import kotlin.concurrent.thread
private val logger = KotlinLogging.logger {}
class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20Reporter {
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
- private val queue = ArrayBlockingQueue<Report>(2048)
- private val stop = AtomicBoolean(false)
+ private val queue = ArrayBlockingQueue<Action>(2048)
private val writerThread = thread(start = true, name = "sc20-writer") {
val stmt = try {
+ conn.autoCommit = false
conn.prepareStatement(
"""
INSERT INTO host_reports (experiment_id, time, duration, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, image_count, server, host_state, host_usage, power_draw, total_submitted_vms, total_queued_vms, total_running_vms, total_finished_vms)
@@ -59,36 +58,40 @@ class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20R
var batch = 0
try {
- while (!stop.get()) {
- val record = queue.take()
- stmt.setLong(1, experimentId)
- stmt.setLong(2, record.time)
- stmt.setLong(3, record.duration)
- stmt.setLong(4, record.requestedBurst)
- stmt.setLong(5, record.grantedBurst)
- stmt.setLong(6, record.overcommissionedBurst)
- stmt.setLong(7, record.interferedBurst)
- stmt.setDouble(8, record.cpuUsage)
- stmt.setDouble(9, record.cpuDemand)
- stmt.setInt(10, record.numberOfDeployedImages)
- stmt.setString(11, record.hostServer.uid.toString())
- stmt.setString(12, record.hostServer.state.name)
- stmt.setDouble(13, record.hostUsage)
- stmt.setDouble(14, record.powerDraw)
- stmt.setLong(15, record.submittedVms)
- stmt.setLong(16, record.queuedVms)
- stmt.setLong(17, record.runningVms)
- stmt.setLong(18, record.finishedVms)
- stmt.addBatch()
- batch++
-
- if (batch > batchSize) {
- stmt.executeBatch()
- batch = 0
+ loop@ while (true) {
+ when (val record = queue.take()) {
+ is Action.Stop -> break@loop
+ is Action.Write -> {
+ stmt.setLong(1, experimentId)
+ stmt.setLong(2, record.time)
+ stmt.setLong(3, record.duration)
+ stmt.setLong(4, record.requestedBurst)
+ stmt.setLong(5, record.grantedBurst)
+ stmt.setLong(6, record.overcommissionedBurst)
+ stmt.setLong(7, record.interferedBurst)
+ stmt.setDouble(8, record.cpuUsage)
+ stmt.setDouble(9, record.cpuDemand)
+ stmt.setInt(10, record.numberOfDeployedImages)
+ stmt.setString(11, record.hostServer.uid.toString())
+ stmt.setString(12, record.hostServer.state.name)
+ stmt.setDouble(13, record.hostUsage)
+ stmt.setDouble(14, record.powerDraw)
+ stmt.setLong(15, record.submittedVms)
+ stmt.setLong(16, record.queuedVms)
+ stmt.setLong(17, record.runningVms)
+ stmt.setLong(18, record.finishedVms)
+ stmt.addBatch()
+ batch++
+
+ if (batch % batchSize == 0) {
+ stmt.executeBatch()
+ conn.commit()
+ }
+ }
}
}
} finally {
- stmt.executeBatch()
+ conn.commit()
stmt.close()
conn.close()
}
@@ -152,7 +155,7 @@ class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20R
val powerDraw = driver.powerDraw.first()
queue.put(
- Report(
+ Action.Write(
time,
duration,
requestedBurst,
@@ -174,27 +177,30 @@ class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20R
}
override fun close() {
- // Busy loop to wait for writer thread to finish
- stop.set(true)
+ queue.put(Action.Stop)
writerThread.join()
}
- data class Report(
- val time: Long,
- val duration: Long,
- val requestedBurst: Long,
- val grantedBurst: Long,
- val overcommissionedBurst: Long,
- val interferedBurst: Long,
- val cpuUsage: Double,
- val cpuDemand: Double,
- val numberOfDeployedImages: Int,
- val hostServer: Server,
- val hostUsage: Double,
- val powerDraw: Double,
- val submittedVms: Long,
- val queuedVms: Long,
- val runningVms: Long,
- val finishedVms: Long
- )
+ private sealed class Action {
+ object Stop : Action()
+
+ data class Write(
+ val time: Long,
+ val duration: Long,
+ val requestedBurst: Long,
+ val grantedBurst: Long,
+ val overcommissionedBurst: Long,
+ val interferedBurst: Long,
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val numberOfDeployedImages: Int,
+ val hostServer: Server,
+ val hostUsage: Double,
+ val powerDraw: Double,
+ val submittedVms: Long,
+ val queuedVms: Long,
+ val runningVms: Long,
+ val finishedVms: Long
+ ) : Action()
+ }
}