diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-13 12:51:20 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-13 12:51:20 +0200 |
| commit | 5c2270d058c312c94ee0970560009e8008042d10 (patch) | |
| tree | 1aa99289f019a512f976c0e6de069285687abeff | |
| parent | 553dba3630d44c31df5b8c312360af6ed1e6c387 (diff) | |
refactor: Share single database writer
12 files changed, 384 insertions, 125 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt index e8222eb0..61b5759d 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt @@ -40,7 +40,7 @@ import com.atlarge.opendc.core.failure.CorrelatedFaultInjector import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.failure.FaultInjector import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter -import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader +import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.trace.TraceReader import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -106,8 +106,8 @@ fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): F /** * Create the trace reader from which the VM workloads are read. */ -fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader { - return Sc20ParquetTraceReader( +fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20StreamingParquetTraceReader { + return Sc20StreamingParquetTraceReader( path, performanceInterferenceModel, vms, diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt index a9429367..a4b26ddb 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt @@ -27,7 +27,7 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider -import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader +import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader @@ -85,6 +85,10 @@ public class ExperimentRunner( */ private val scenarioIds = mutableMapOf<Scenario, Long>() + init { + reporterProvider.init(ds) + } + /** * Create an execution plan */ @@ -123,7 +127,7 @@ public class ExperimentRunner( performanceInterferenceModel: PerformanceInterferenceModel, seed: Int ): TraceReader<VmWorkload> { - return Sc20ParquetTraceReader( + return Sc20StreamingParquetTraceReader( File(tracePath, name), performanceInterferenceModel, emptyList(), @@ -142,10 +146,14 @@ public class ExperimentRunner( * Run the specified run. */ private fun run(run: Run) { - val reporter = reporterProvider.createReporter(ds, experimentId) + val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id) val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run.seed) val environmentReader = createEnvironmentReader(run.scenario.topology.name) - run.scenario(run, reporter, environmentReader, traceReader) + try { + run.scenario(run, reporter, environmentReader, traceReader) + } finally { + reporter.close() + } } /** @@ -197,6 +205,7 @@ public class ExperimentRunner( } override fun close() { + reporterProvider.close() helper.close() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt index 94a8d76e..9dfed03b 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt @@ -28,6 +28,7 @@ import com.atlarge.opendc.experiments.sc20.reporter.ExperimentParquetReporter import com.atlarge.opendc.experiments.sc20.reporter.ExperimentPostgresReporter import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider +import com.atlarge.opendc.experiments.sc20.reporter.PostgresHostMetricsWriter import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader import com.github.ajalt.clikt.core.CliktCommand @@ -122,11 +123,12 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) .construct() + val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel) try { - val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel) runner.run() } finally { + runner.close() ds.close() } } @@ -141,13 +143,25 @@ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentRepo .file() .defaultLazy { File("data") } - override fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter = - ExperimentParquetReporter(File(path, "results-${System.currentTimeMillis()}.parquet")) + override fun createReporter(scenario: Long, run: Int): ExperimentReporter = + ExperimentParquetReporter(File(path, "results-$scenario-$run.parquet")) + + override fun close() {} } class Postgres : Reporter("Options for reporting using PostgreSQL") { - override fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter = - ExperimentPostgresReporter(ds.connection, experimentId) + lateinit var hostWriter: PostgresHostMetricsWriter + + override fun init(ds: DataSource) { + hostWriter = PostgresHostMetricsWriter(ds, 4096) + } + + override fun createReporter(scenario: Long, run: Int): ExperimentReporter = + ExperimentPostgresReporter(scenario, run, hostWriter) + + override fun close() { + hostWriter.close() + } } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt index 0e612ae0..a2ae8100 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt @@ -57,15 +57,15 @@ abstract class AbstractSc20Portfolio(name: String) : Portfolio(name) { object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") { override val topologies = listOf( - Topology("base"), - Topology("rep-vol-hor-hom"), - Topology("rep-vol-hor-het"), - Topology("rep-vol-ver-hom"), - Topology("rep-vol-ver-het"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-hor-het"), - Topology("exp-vol-ver-hom"), - Topology("exp-vol-ver-het") + Topology("base") + // Topology("rep-vol-hor-hom"), + // Topology("rep-vol-hor-het"), + // Topology("rep-vol-ver-hom"), + // Topology("rep-vol-ver-het"), + // Topology("exp-vol-hor-hom"), + // Topology("exp-vol-hor-het"), + // Topology("exp-vol-ver-hom"), + // Topology("exp-vol-ver-het") ) override val workloads = listOf( @@ -76,7 +76,8 @@ object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") { ) override val operationalPhenomena = listOf( - true to true + // true to true + false to true ) override val allocationPolicies = listOf( diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt index d0dfd2e8..8f42cdd4 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt @@ -24,8 +24,17 @@ package com.atlarge.opendc.experiments.sc20.reporter +import java.io.Closeable import javax.sql.DataSource -interface ExperimentReporterProvider { - public fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter +interface ExperimentReporterProvider : Closeable { + /** + * Initialize the provider with the specified data source. + */ + public fun init(ds: DataSource) {} + + /** + * Create a reporter for a single run. + */ + public fun createReporter(scenario: Long, run: Int): ExperimentReporter } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt new file mode 100644 index 00000000..061f6cce --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter + +import com.atlarge.opendc.compute.core.Server + +/** + * A periodic report of the host machine metrics. + */ +data class HostMetrics( + val time: Long, + val duration: Long, + val host: Server, + val vmCount: Int, + val requestedBurst: Long, + val grantedBurst: Long, + val overcommissionedBurst: Long, + val interferedBurst: Long, + val cpuUsage: Double, + val cpuDemand: Double, + val powerDraw: Double +) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt index 18019aa5..532daa48 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt @@ -31,71 +31,11 @@ import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import kotlinx.coroutines.flow.first import mu.KotlinLogging -import java.sql.Connection -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread private val logger = KotlinLogging.logger {} -class ExperimentPostgresReporter(val conn: Connection, val experimentId: Long) : ExperimentReporter { +class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: PostgresHostMetricsWriter) : ExperimentReporter { private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() - private val queue = ArrayBlockingQueue<Action>(2048) - private val writerThread = thread(start = true, name = "sc20-writer") { - val stmt = try { - conn.autoCommit = false - conn.prepareStatement( - """ - INSERT INTO host_reports (experiment_id, time, duration, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, image_count, server, host_state, host_usage, power_draw, total_submitted_vms, total_queued_vms, total_running_vms, total_finished_vms) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """.trimIndent() - ) - } catch (e: Throwable) { - conn.close() - throw e - } - - val batchSize = 4096 - var batch = 0 - - try { - loop@ while (true) { - when (val record = queue.take()) { - is Action.Stop -> break@loop - is Action.Write -> { - stmt.setLong(1, experimentId) - stmt.setLong(2, record.time) - stmt.setLong(3, record.duration) - stmt.setLong(4, record.requestedBurst) - stmt.setLong(5, record.grantedBurst) - stmt.setLong(6, record.overcommissionedBurst) - stmt.setLong(7, record.interferedBurst) - stmt.setDouble(8, record.cpuUsage) - stmt.setDouble(9, record.cpuDemand) - stmt.setInt(10, record.numberOfDeployedImages) - stmt.setString(11, record.hostServer.uid.toString()) - stmt.setString(12, record.hostServer.state.name) - stmt.setDouble(13, record.hostUsage) - stmt.setDouble(14, record.powerDraw) - stmt.setLong(15, record.submittedVms) - stmt.setLong(16, record.queuedVms) - stmt.setLong(17, record.runningVms) - stmt.setLong(18, record.finishedVms) - stmt.addBatch() - batch++ - - if (batch % batchSize == 0) { - stmt.executeBatch() - conn.commit() - } - } - } - } - } finally { - conn.commit() - stmt.close() - conn.close() - } - } override suspend fun reportVmStateChange(server: Server) {} @@ -151,56 +91,24 @@ class ExperimentPostgresReporter(val conn: Connection, val experimentId: Long) : ) { // Assume for now that the host is not virtualized and measure the current power draw val driver = hostServer.services[BareMetalDriver.Key] - val usage = driver.usage.first() val powerDraw = driver.powerDraw.first() - queue.put( - Action.Write( + writer.write( + scenario, run, HostMetrics( time, duration, + hostServer, + numberOfDeployedImages, requestedBurst, grantedBurst, overcommissionedBurst, interferedBurst, cpuUsage, cpuDemand, - numberOfDeployedImages, - hostServer, - usage, - powerDraw, - submittedVms, - queuedVms, - runningVms, - finishedVms + powerDraw ) ) } - override fun close() { - queue.put(Action.Stop) - writerThread.join() - } - - private sealed class Action { - object Stop : Action() - - data class Write( - val time: Long, - val duration: Long, - val requestedBurst: Long, - val grantedBurst: Long, - val overcommissionedBurst: Long, - val interferedBurst: Long, - val cpuUsage: Double, - val cpuDemand: Double, - val numberOfDeployedImages: Int, - val hostServer: Server, - val hostUsage: Double, - val powerDraw: Double, - val submittedVms: Long, - val queuedVms: Long, - val runningVms: Long, - val finishedVms: Long - ) : Action() - } + override fun close() {} } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt new file mode 100644 index 00000000..55b80e4c --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt @@ -0,0 +1,57 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter + +import java.sql.Connection +import java.sql.PreparedStatement +import java.sql.Timestamp +import javax.sql.DataSource + +/** + * A [PostgresMetricsWriter] for persisting [HostMetrics]. + */ +public class PostgresHostMetricsWriter(ds: DataSource, batchSize: Int) : + PostgresMetricsWriter<HostMetrics>(ds, batchSize) { + override fun createStatement(conn: Connection): PreparedStatement { + return conn.prepareStatement("INSERT INTO host_metrics (scenario_id, run_id, host_id, state, timestamp, duration, vm_count, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, power_draw) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + } + + override fun persist(action: Action.Write<HostMetrics>, stmt: PreparedStatement) { + stmt.setLong(1, action.scenario) + stmt.setInt(2, action.run) + stmt.setString(3, action.metrics.host.name) + stmt.setString(4, action.metrics.host.state.name) + stmt.setTimestamp(5, Timestamp(action.metrics.time)) + stmt.setLong(6, action.metrics.duration) + stmt.setInt(7, action.metrics.vmCount) + stmt.setLong(8, action.metrics.requestedBurst) + stmt.setLong(9, action.metrics.grantedBurst) + stmt.setLong(10, action.metrics.overcommissionedBurst) + stmt.setLong(11, action.metrics.interferedBurst) + stmt.setDouble(12, action.metrics.cpuUsage) + stmt.setDouble(13, action.metrics.cpuDemand) + stmt.setDouble(14, action.metrics.powerDraw) + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt new file mode 100644 index 00000000..a30dee05 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt @@ -0,0 +1,135 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter + +import java.io.Closeable +import java.sql.Connection +import java.sql.PreparedStatement +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import javax.sql.DataSource +import kotlin.concurrent.thread + +/** + * The experiment writer is a separate thread that is responsible for writing the results to the + * database. + */ +public abstract class PostgresMetricsWriter<T>( + private val ds: DataSource, + private val batchSize: Int = 4096 +) : Runnable, Closeable { + /** + * The queue of commands to process. + */ + private val queue: BlockingQueue<Action> = ArrayBlockingQueue(batchSize) + + /** + * The thread for the actual writer. + */ + private val writerThread: Thread = thread { run() } + + + /** + * Write the specified metrics to the database. + */ + public fun write(scenario: Long, run: Int, metrics: T) { + queue.put(Action.Write(scenario, run, metrics)) + } + + /** + * Signal the writer to stop. + */ + public override fun close() { + queue.put(Action.Stop) + writerThread.join() + } + + /** + * Create a prepared statement to use. + */ + public abstract fun createStatement(conn: Connection): PreparedStatement + + /** + * Persist the specified metrics using the given [stmt]. + */ + public abstract fun persist(action: Action.Write<T>, stmt: PreparedStatement) + + /** + * Start the writer thread. + */ + override fun run() { + val conn = ds.connection + var batch = 0 + + conn.autoCommit = false + val stmt = createStatement(conn) + + try { + val actions = mutableListOf<Action>() + + loop@ while (true) { + actions.clear() + + if (queue.isEmpty()) { + actions.add(queue.take()) + } else { + queue.drainTo(actions) + } + + for (action in actions) { + when (action) { + is Action.Stop -> break@loop + is Action.Write<*> -> { + @Suppress("UNCHECKED_CAST") + persist(action as Action.Write<T>, stmt) + stmt.addBatch() + batch++ + + if (batch % batchSize == 0) { + stmt.executeBatch() + conn.commit() + } + } + } + } + } + } finally { + conn.commit() + conn.close() + } + } + + sealed class Action { + /** + * A poison pill that will stop the writer thread. + */ + object Stop : Action() + + /** + * Write the specified metrics to the database. + */ + data class Write<out T>(val scenario: Long, val run: Int, val metrics: T) : Action() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt new file mode 100644 index 00000000..966662cd --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter + +/** + * A periodic report of the provisioner's metrics. + */ +data class ProvisionerMetrics( + val time: Long, + val totalHostCount: Int, + val availableHostCount: Int, + val totalVmCount: Int, + val activeVmCount: Int, + val inactiveVmCount: Int, + val waitingVmCount: Int, + val failedVmCount: Int +) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt new file mode 100644 index 00000000..5f963206 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter + +import com.atlarge.opendc.compute.core.Server + +/** + * A periodic report of a virtual machine's metrics. + */ +data class VmMetrics( + val time: Long, + val duration: Long, + val vm: Server, + val host: Server, + val requestedBurst: Long, + val grantedBurst: Long, + val overcommissionedBurst: Long, + val interferedBurst: Long, + val cpuUsage: Double, + val cpuDemand: Double +) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt index 8a204ca3..aa06ce65 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt @@ -53,13 +53,13 @@ import kotlin.random.Random private val logger = KotlinLogging.logger {} /** - * A [TraceReader] for the internal VM workload trace format. + * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. * * @param traceFile The directory of the traces. * @param performanceInterferenceModel The performance model covering the workload in the VM trace. */ @OptIn(ExperimentalStdlibApi::class) -class Sc20ParquetTraceReader( +class Sc20StreamingParquetTraceReader( traceFile: File, performanceInterferenceModel: PerformanceInterferenceModel, selectedVms: List<String>, |
