diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-07 23:31:07 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-07 23:32:56 +0200 |
| commit | cbdbf818004040b60aa122dc6cb98ef635fa5ac1 (patch) | |
| tree | cffff3e75926084edd742e28d2cb4c84d8372604 /opendc/opendc-experiments-sc20/src | |
| parent | f4fe224194c0bcabda2e17005077e76ea9e7098c (diff) | |
feat: Add initial version of Postgres reporter
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
3 files changed, 242 insertions, 29 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt index 4264ad3f..51448c9e 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt @@ -51,6 +51,7 @@ import com.github.ajalt.clikt.parameters.options.required import com.github.ajalt.clikt.parameters.types.choice import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.int +import com.github.ajalt.clikt.parameters.types.long import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch @@ -59,6 +60,7 @@ import mu.KotlinLogging import java.io.File import java.io.FileReader import java.io.InputStream +import java.sql.DriverManager import java.util.ServiceLoader import kotlin.random.Random @@ -114,7 +116,8 @@ class ExperimentCommand : CliktCommand(name = "sc20-experiment") { .required() private val reporter by option().groupChoice( - "parquet" to Parquet() + "parquet" to Parquet(), + "postgres" to Postgres() ).required() private fun parseVMs(string: String): List<String> { @@ -216,6 +219,16 @@ class Parquet : Reporter("Options for reporting using Parquet") { override fun createReporter(): Sc20Reporter = Sc20ParquetReporter(path) } +class Postgres : Reporter("Options for reporting using PostgreSQL") { + private val url by option(help = "JDBC connection url").required() + private val experimentId by option(help = "Experiment ID").long().required() + + override fun createReporter(): Sc20Reporter { + val conn = DriverManager.getConnection(url) + return Sc20PostgresReporter(conn, experimentId) + } +} + /** * Main entry point of the experiment. */ diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt index eaac912a..f2139144 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt @@ -26,21 +26,21 @@ class Sc20ParquetReporter(destination: File) : Sc20Reporter { .fields() .name("time").type().longType().noDefault() .name("duration").type().longType().noDefault() - .name("requestedBurst").type().longType().noDefault() - .name("grantedBurst").type().longType().noDefault() - .name("overcommissionedBurst").type().longType().noDefault() - .name("interferedBurst").type().longType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("cpuDemand").type().doubleType().noDefault() - .name("numberOfDeployedImages").type().intType().noDefault() + .name("requested_burst").type().longType().noDefault() + .name("granted_burst").type().longType().noDefault() + .name("overcommissioned_burst").type().longType().noDefault() + .name("interfered_burst").type().longType().noDefault() + .name("cpu_usage").type().doubleType().noDefault() + .name("cpu_demand").type().doubleType().noDefault() + .name("image_count").type().intType().noDefault() .name("server").type().stringType().noDefault() - .name("hostState").type().stringType().noDefault() - .name("hostUsage").type().doubleType().noDefault() - .name("powerDraw").type().doubleType().noDefault() - .name("totalSubmittedVms").type().longType().noDefault() - .name("totalQueuedVms").type().longType().noDefault() - .name("totalRunningVms").type().longType().noDefault() - .name("totalFinishedVms").type().longType().noDefault() + .name("host_state").type().stringType().noDefault() + .name("host_usage").type().doubleType().noDefault() + .name("power_draw").type().doubleType().noDefault() + .name("total_submitted_vms").type().longType().noDefault() + .name("total_queued_vms").type().longType().noDefault() + .name("total_running_vms").type().longType().noDefault() + .name("total_finished_vms").type().longType().noDefault() .endRecord() private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination.absolutePath)) .withSchema(schema) @@ -122,21 +122,21 @@ class Sc20ParquetReporter(destination: File) : Sc20Reporter { val record = GenericData.Record(schema) record.put("time", time) record.put("duration", duration) - record.put("requestedBurst", requestedBurst) - record.put("grantedBurst", grantedBurst) - record.put("overcommissionedBurst", overcommissionedBurst) - record.put("interferedBurst", interferedBurst) - record.put("cpuUsage", cpuUsage) - record.put("cpuDemand", cpuDemand) - record.put("numberOfDeployedImages", numberOfDeployedImages) + record.put("requested_burst", requestedBurst) + record.put("granted_burst", grantedBurst) + record.put("overcommissioned_burst", overcommissionedBurst) + record.put("interfered_burst", interferedBurst) + record.put("cpu_usage", cpuUsage) + record.put("cpu_demand", cpuDemand) + record.put("image_count", numberOfDeployedImages) record.put("server", hostServer.uid) - record.put("hostState", hostServer.state) - record.put("hostUsage", usage) - record.put("powerDraw", powerDraw) - record.put("totalSubmittedVms", submittedVms) - record.put("totalQueuedVms", queuedVms) - record.put("totalRunningVms", runningVms) - record.put("totalFinishedVms", finishedVms) + record.put("host_state", hostServer.state) + record.put("host_usage", usage) + record.put("power_draw", powerDraw) + record.put("total_submitted_vms", submittedVms) + record.put("total_queued_vms", queuedVms) + record.put("total_running_vms", runningVms) + record.put("total_finished_vms", finishedVms) queue.put(record) } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt new file mode 100644 index 00000000..5c5e6ceb --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt @@ -0,0 +1,200 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import kotlinx.coroutines.flow.first +import mu.KotlinLogging +import java.sql.Connection +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.concurrent.thread + +private val logger = KotlinLogging.logger {} + +class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20Reporter { + private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() + private val queue = ArrayBlockingQueue<Report>(2048) + private val stop = AtomicBoolean(false) + private val writerThread = thread(start = true, name = "sc20-writer") { + val stmt = try { + conn.prepareStatement( + """ + INSERT INTO host_reports (experiment_id, time, duration, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, image_count, server, host_state, host_usage, power_draw, total_submitted_vms, total_queued_vms, total_running_vms, total_finished_vms) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """.trimIndent() + ) + } catch (e: Throwable) { + conn.close() + throw e + } + + val batchSize = 4096 + var batch = 0 + + try { + while (!stop.get()) { + val record = queue.take() + stmt.setLong(1, experimentId) + stmt.setLong(2, record.time) + stmt.setLong(3, record.duration) + stmt.setLong(4, record.requestedBurst) + stmt.setLong(5, record.grantedBurst) + stmt.setLong(6, record.overcommissionedBurst) + stmt.setLong(7, record.interferedBurst) + stmt.setDouble(8, record.cpuUsage) + stmt.setDouble(9, record.cpuDemand) + stmt.setInt(10, record.numberOfDeployedImages) + stmt.setString(11, record.hostServer.uid.toString()) + stmt.setString(12, record.hostServer.state.name) + stmt.setDouble(13, record.hostUsage) + stmt.setDouble(14, record.powerDraw) + stmt.setLong(15, record.submittedVms) + stmt.setLong(16, record.queuedVms) + stmt.setLong(17, record.runningVms) + stmt.setLong(18, record.finishedVms) + stmt.addBatch() + batch++ + + if (batch > batchSize) { + stmt.executeBatch() + batch = 0 + } + } + } finally { + stmt.executeBatch() + stmt.close() + conn.close() + } + } + + override suspend fun reportVmStateChange(server: Server) {} + + override suspend fun reportHostStateChange( + driver: VirtDriver, + server: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long + ) { + val lastServerState = lastServerStates[server] + if (server.state == ServerState.SHUTOFF && lastServerState != null) { + val duration = simulationContext.clock.millis() - lastServerState.second + reportHostSlice( + simulationContext.clock.millis(), + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server, + submittedVms, + queuedVms, + runningVms, + finishedVms, + duration + ) + } + + logger.info("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]") + + lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) + } + + override suspend fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, + duration: Long + ) { + // Assume for now that the host is not virtualized and measure the current power draw + val driver = hostServer.services[BareMetalDriver.Key] + val usage = driver.usage.first() + val powerDraw = driver.powerDraw.first() + + queue.put( + Report( + time, + duration, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + numberOfDeployedImages, + hostServer, + usage, + powerDraw, + submittedVms, + queuedVms, + runningVms, + finishedVms + ) + ) + } + + override fun close() { + // Busy loop to wait for writer thread to finish + stop.set(true) + writerThread.join() + } + + data class Report( + val time: Long, + val duration: Long, + val requestedBurst: Long, + val grantedBurst: Long, + val overcommissionedBurst: Long, + val interferedBurst: Long, + val cpuUsage: Double, + val cpuDemand: Double, + val numberOfDeployedImages: Int, + val hostServer: Server, + val hostUsage: Double, + val powerDraw: Double, + val submittedVms: Long, + val queuedVms: Long, + val runningVms: Long, + val finishedVms: Long + ) +} |
