summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-sc20/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-07 23:31:07 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-07 23:32:56 +0200
commitcbdbf818004040b60aa122dc6cb98ef635fa5ac1 (patch)
treecffff3e75926084edd742e28d2cb4c84d8372604 /opendc/opendc-experiments-sc20/src
parentf4fe224194c0bcabda2e17005077e76ea9e7098c (diff)
feat: Add initial version of Postgres reporter
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt15
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt56
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt200
3 files changed, 242 insertions, 29 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
index 4264ad3f..51448c9e 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
@@ -51,6 +51,7 @@ import com.github.ajalt.clikt.parameters.options.required
import com.github.ajalt.clikt.parameters.types.choice
import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.int
+import com.github.ajalt.clikt.parameters.types.long
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
@@ -59,6 +60,7 @@ import mu.KotlinLogging
import java.io.File
import java.io.FileReader
import java.io.InputStream
+import java.sql.DriverManager
import java.util.ServiceLoader
import kotlin.random.Random
@@ -114,7 +116,8 @@ class ExperimentCommand : CliktCommand(name = "sc20-experiment") {
.required()
private val reporter by option().groupChoice(
- "parquet" to Parquet()
+ "parquet" to Parquet(),
+ "postgres" to Postgres()
).required()
private fun parseVMs(string: String): List<String> {
@@ -216,6 +219,16 @@ class Parquet : Reporter("Options for reporting using Parquet") {
override fun createReporter(): Sc20Reporter = Sc20ParquetReporter(path)
}
+class Postgres : Reporter("Options for reporting using PostgreSQL") {
+ private val url by option(help = "JDBC connection url").required()
+ private val experimentId by option(help = "Experiment ID").long().required()
+
+ override fun createReporter(): Sc20Reporter {
+ val conn = DriverManager.getConnection(url)
+ return Sc20PostgresReporter(conn, experimentId)
+ }
+}
+
/**
* Main entry point of the experiment.
*/
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt
index eaac912a..f2139144 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt
@@ -26,21 +26,21 @@ class Sc20ParquetReporter(destination: File) : Sc20Reporter {
.fields()
.name("time").type().longType().noDefault()
.name("duration").type().longType().noDefault()
- .name("requestedBurst").type().longType().noDefault()
- .name("grantedBurst").type().longType().noDefault()
- .name("overcommissionedBurst").type().longType().noDefault()
- .name("interferedBurst").type().longType().noDefault()
- .name("cpuUsage").type().doubleType().noDefault()
- .name("cpuDemand").type().doubleType().noDefault()
- .name("numberOfDeployedImages").type().intType().noDefault()
+ .name("requested_burst").type().longType().noDefault()
+ .name("granted_burst").type().longType().noDefault()
+ .name("overcommissioned_burst").type().longType().noDefault()
+ .name("interfered_burst").type().longType().noDefault()
+ .name("cpu_usage").type().doubleType().noDefault()
+ .name("cpu_demand").type().doubleType().noDefault()
+ .name("image_count").type().intType().noDefault()
.name("server").type().stringType().noDefault()
- .name("hostState").type().stringType().noDefault()
- .name("hostUsage").type().doubleType().noDefault()
- .name("powerDraw").type().doubleType().noDefault()
- .name("totalSubmittedVms").type().longType().noDefault()
- .name("totalQueuedVms").type().longType().noDefault()
- .name("totalRunningVms").type().longType().noDefault()
- .name("totalFinishedVms").type().longType().noDefault()
+ .name("host_state").type().stringType().noDefault()
+ .name("host_usage").type().doubleType().noDefault()
+ .name("power_draw").type().doubleType().noDefault()
+ .name("total_submitted_vms").type().longType().noDefault()
+ .name("total_queued_vms").type().longType().noDefault()
+ .name("total_running_vms").type().longType().noDefault()
+ .name("total_finished_vms").type().longType().noDefault()
.endRecord()
private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination.absolutePath))
.withSchema(schema)
@@ -122,21 +122,21 @@ class Sc20ParquetReporter(destination: File) : Sc20Reporter {
val record = GenericData.Record(schema)
record.put("time", time)
record.put("duration", duration)
- record.put("requestedBurst", requestedBurst)
- record.put("grantedBurst", grantedBurst)
- record.put("overcommissionedBurst", overcommissionedBurst)
- record.put("interferedBurst", interferedBurst)
- record.put("cpuUsage", cpuUsage)
- record.put("cpuDemand", cpuDemand)
- record.put("numberOfDeployedImages", numberOfDeployedImages)
+ record.put("requested_burst", requestedBurst)
+ record.put("granted_burst", grantedBurst)
+ record.put("overcommissioned_burst", overcommissionedBurst)
+ record.put("interfered_burst", interferedBurst)
+ record.put("cpu_usage", cpuUsage)
+ record.put("cpu_demand", cpuDemand)
+ record.put("image_count", numberOfDeployedImages)
record.put("server", hostServer.uid)
- record.put("hostState", hostServer.state)
- record.put("hostUsage", usage)
- record.put("powerDraw", powerDraw)
- record.put("totalSubmittedVms", submittedVms)
- record.put("totalQueuedVms", queuedVms)
- record.put("totalRunningVms", runningVms)
- record.put("totalFinishedVms", finishedVms)
+ record.put("host_state", hostServer.state)
+ record.put("host_usage", usage)
+ record.put("power_draw", powerDraw)
+ record.put("total_submitted_vms", submittedVms)
+ record.put("total_queued_vms", queuedVms)
+ record.put("total_running_vms", runningVms)
+ record.put("total_finished_vms", finishedVms)
queue.put(record)
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt
new file mode 100644
index 00000000..5c5e6ceb
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt
@@ -0,0 +1,200 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.odcsim.simulationContext
+import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerState
+import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import kotlinx.coroutines.flow.first
+import mu.KotlinLogging
+import java.sql.Connection
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.atomic.AtomicBoolean
+import kotlin.concurrent.thread
+
+private val logger = KotlinLogging.logger {}
+
+class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20Reporter {
+ private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
+ private val queue = ArrayBlockingQueue<Report>(2048)
+ private val stop = AtomicBoolean(false)
+ private val writerThread = thread(start = true, name = "sc20-writer") {
+ val stmt = try {
+ conn.prepareStatement(
+ """
+ INSERT INTO host_reports (experiment_id, time, duration, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, image_count, server, host_state, host_usage, power_draw, total_submitted_vms, total_queued_vms, total_running_vms, total_finished_vms)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """.trimIndent()
+ )
+ } catch (e: Throwable) {
+ conn.close()
+ throw e
+ }
+
+ val batchSize = 4096
+ var batch = 0
+
+ try {
+ while (!stop.get()) {
+ val record = queue.take()
+ stmt.setLong(1, experimentId)
+ stmt.setLong(2, record.time)
+ stmt.setLong(3, record.duration)
+ stmt.setLong(4, record.requestedBurst)
+ stmt.setLong(5, record.grantedBurst)
+ stmt.setLong(6, record.overcommissionedBurst)
+ stmt.setLong(7, record.interferedBurst)
+ stmt.setDouble(8, record.cpuUsage)
+ stmt.setDouble(9, record.cpuDemand)
+ stmt.setInt(10, record.numberOfDeployedImages)
+ stmt.setString(11, record.hostServer.uid.toString())
+ stmt.setString(12, record.hostServer.state.name)
+ stmt.setDouble(13, record.hostUsage)
+ stmt.setDouble(14, record.powerDraw)
+ stmt.setLong(15, record.submittedVms)
+ stmt.setLong(16, record.queuedVms)
+ stmt.setLong(17, record.runningVms)
+ stmt.setLong(18, record.finishedVms)
+ stmt.addBatch()
+ batch++
+
+ if (batch > batchSize) {
+ stmt.executeBatch()
+ batch = 0
+ }
+ }
+ } finally {
+ stmt.executeBatch()
+ stmt.close()
+ conn.close()
+ }
+ }
+
+ override suspend fun reportVmStateChange(server: Server) {}
+
+ override suspend fun reportHostStateChange(
+ driver: VirtDriver,
+ server: Server,
+ submittedVms: Long,
+ queuedVms: Long,
+ runningVms: Long,
+ finishedVms: Long
+ ) {
+ val lastServerState = lastServerStates[server]
+ if (server.state == ServerState.SHUTOFF && lastServerState != null) {
+ val duration = simulationContext.clock.millis() - lastServerState.second
+ reportHostSlice(
+ simulationContext.clock.millis(),
+ 0,
+ 0,
+ 0,
+ 0,
+ 0.0,
+ 0.0,
+ 0,
+ server,
+ submittedVms,
+ queuedVms,
+ runningVms,
+ finishedVms,
+ duration
+ )
+ }
+
+ logger.info("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]")
+
+ lastServerStates[server] = Pair(server.state, simulationContext.clock.millis())
+ }
+
+ override suspend fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ numberOfDeployedImages: Int,
+ hostServer: Server,
+ submittedVms: Long,
+ queuedVms: Long,
+ runningVms: Long,
+ finishedVms: Long,
+ duration: Long
+ ) {
+ // Assume for now that the host is not virtualized and measure the current power draw
+ val driver = hostServer.services[BareMetalDriver.Key]
+ val usage = driver.usage.first()
+ val powerDraw = driver.powerDraw.first()
+
+ queue.put(
+ Report(
+ time,
+ duration,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ numberOfDeployedImages,
+ hostServer,
+ usage,
+ powerDraw,
+ submittedVms,
+ queuedVms,
+ runningVms,
+ finishedVms
+ )
+ )
+ }
+
+ override fun close() {
+ // Busy loop to wait for writer thread to finish
+ stop.set(true)
+ writerThread.join()
+ }
+
+ data class Report(
+ val time: Long,
+ val duration: Long,
+ val requestedBurst: Long,
+ val grantedBurst: Long,
+ val overcommissionedBurst: Long,
+ val interferedBurst: Long,
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val numberOfDeployedImages: Int,
+ val hostServer: Server,
+ val hostUsage: Double,
+ val powerDraw: Double,
+ val submittedVms: Long,
+ val queuedVms: Long,
+ val runningVms: Long,
+ val finishedVms: Long
+ )
+}