summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-13 12:51:20 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-13 12:51:20 +0200
commit5c2270d058c312c94ee0970560009e8008042d10 (patch)
tree1aa99289f019a512f976c0e6de069285687abeff
parent553dba3630d44c31df5b8c312360af6ed1e6c387 (diff)
refactor: Share single database writer
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt6
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt17
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt24
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt21
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt13
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt44
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt106
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt57
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt135
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt39
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt43
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt4
12 files changed, 384 insertions, 125 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
index e8222eb0..61b5759d 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
@@ -40,7 +40,7 @@ import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.failure.FaultInjector
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter
-import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
+import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -106,8 +106,8 @@ fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): F
/**
* Create the trace reader from which the VM workloads are read.
*/
-fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader {
- return Sc20ParquetTraceReader(
+fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20StreamingParquetTraceReader {
+ return Sc20StreamingParquetTraceReader(
path,
performanceInterferenceModel,
vms,
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
index a9429367..a4b26ddb 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
@@ -27,7 +27,7 @@ package com.atlarge.opendc.experiments.sc20
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider
-import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
+import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
@@ -85,6 +85,10 @@ public class ExperimentRunner(
*/
private val scenarioIds = mutableMapOf<Scenario, Long>()
+ init {
+ reporterProvider.init(ds)
+ }
+
/**
* Create an execution plan
*/
@@ -123,7 +127,7 @@ public class ExperimentRunner(
performanceInterferenceModel: PerformanceInterferenceModel,
seed: Int
): TraceReader<VmWorkload> {
- return Sc20ParquetTraceReader(
+ return Sc20StreamingParquetTraceReader(
File(tracePath, name),
performanceInterferenceModel,
emptyList(),
@@ -142,10 +146,14 @@ public class ExperimentRunner(
* Run the specified run.
*/
private fun run(run: Run) {
- val reporter = reporterProvider.createReporter(ds, experimentId)
+ val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id)
val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run.seed)
val environmentReader = createEnvironmentReader(run.scenario.topology.name)
- run.scenario(run, reporter, environmentReader, traceReader)
+ try {
+ run.scenario(run, reporter, environmentReader, traceReader)
+ } finally {
+ reporter.close()
+ }
}
/**
@@ -197,6 +205,7 @@ public class ExperimentRunner(
}
override fun close() {
+ reporterProvider.close()
helper.close()
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
index 94a8d76e..9dfed03b 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
@@ -28,6 +28,7 @@ import com.atlarge.opendc.experiments.sc20.reporter.ExperimentParquetReporter
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentPostgresReporter
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider
+import com.atlarge.opendc.experiments.sc20.reporter.PostgresHostMetricsWriter
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
import com.github.ajalt.clikt.core.CliktCommand
@@ -122,11 +123,12 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream)
.construct()
+ val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel)
try {
- val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel)
runner.run()
} finally {
+ runner.close()
ds.close()
}
}
@@ -141,13 +143,25 @@ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentRepo
.file()
.defaultLazy { File("data") }
- override fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter =
- ExperimentParquetReporter(File(path, "results-${System.currentTimeMillis()}.parquet"))
+ override fun createReporter(scenario: Long, run: Int): ExperimentReporter =
+ ExperimentParquetReporter(File(path, "results-$scenario-$run.parquet"))
+
+ override fun close() {}
}
class Postgres : Reporter("Options for reporting using PostgreSQL") {
- override fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter =
- ExperimentPostgresReporter(ds.connection, experimentId)
+ lateinit var hostWriter: PostgresHostMetricsWriter
+
+ override fun init(ds: DataSource) {
+ hostWriter = PostgresHostMetricsWriter(ds, 4096)
+ }
+
+ override fun createReporter(scenario: Long, run: Int): ExperimentReporter =
+ ExperimentPostgresReporter(scenario, run, hostWriter)
+
+ override fun close() {
+ hostWriter.close()
+ }
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
index 0e612ae0..a2ae8100 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
@@ -57,15 +57,15 @@ abstract class AbstractSc20Portfolio(name: String) : Portfolio(name) {
object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") {
override val topologies = listOf(
- Topology("base"),
- Topology("rep-vol-hor-hom"),
- Topology("rep-vol-hor-het"),
- Topology("rep-vol-ver-hom"),
- Topology("rep-vol-ver-het"),
- Topology("exp-vol-hor-hom"),
- Topology("exp-vol-hor-het"),
- Topology("exp-vol-ver-hom"),
- Topology("exp-vol-ver-het")
+ Topology("base")
+ // Topology("rep-vol-hor-hom"),
+ // Topology("rep-vol-hor-het"),
+ // Topology("rep-vol-ver-hom"),
+ // Topology("rep-vol-ver-het"),
+ // Topology("exp-vol-hor-hom"),
+ // Topology("exp-vol-hor-het"),
+ // Topology("exp-vol-ver-hom"),
+ // Topology("exp-vol-ver-het")
)
override val workloads = listOf(
@@ -76,7 +76,8 @@ object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") {
)
override val operationalPhenomena = listOf(
- true to true
+ // true to true
+ false to true
)
override val allocationPolicies = listOf(
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt
index d0dfd2e8..8f42cdd4 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt
@@ -24,8 +24,17 @@
package com.atlarge.opendc.experiments.sc20.reporter
+import java.io.Closeable
import javax.sql.DataSource
-interface ExperimentReporterProvider {
- public fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter
+interface ExperimentReporterProvider : Closeable {
+ /**
+ * Initialize the provider with the specified data source.
+ */
+ public fun init(ds: DataSource) {}
+
+ /**
+ * Create a reporter for a single run.
+ */
+ public fun createReporter(scenario: Long, run: Int): ExperimentReporter
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt
new file mode 100644
index 00000000..061f6cce
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt
@@ -0,0 +1,44 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.reporter
+
+import com.atlarge.opendc.compute.core.Server
+
+/**
+ * A periodic report of the host machine metrics.
+ */
+data class HostMetrics(
+ val time: Long,
+ val duration: Long,
+ val host: Server,
+ val vmCount: Int,
+ val requestedBurst: Long,
+ val grantedBurst: Long,
+ val overcommissionedBurst: Long,
+ val interferedBurst: Long,
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val powerDraw: Double
+)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
index 18019aa5..532daa48 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
@@ -31,71 +31,11 @@ import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import kotlinx.coroutines.flow.first
import mu.KotlinLogging
-import java.sql.Connection
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
private val logger = KotlinLogging.logger {}
-class ExperimentPostgresReporter(val conn: Connection, val experimentId: Long) : ExperimentReporter {
+class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: PostgresHostMetricsWriter) : ExperimentReporter {
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
- private val queue = ArrayBlockingQueue<Action>(2048)
- private val writerThread = thread(start = true, name = "sc20-writer") {
- val stmt = try {
- conn.autoCommit = false
- conn.prepareStatement(
- """
- INSERT INTO host_reports (experiment_id, time, duration, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, image_count, server, host_state, host_usage, power_draw, total_submitted_vms, total_queued_vms, total_running_vms, total_finished_vms)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """.trimIndent()
- )
- } catch (e: Throwable) {
- conn.close()
- throw e
- }
-
- val batchSize = 4096
- var batch = 0
-
- try {
- loop@ while (true) {
- when (val record = queue.take()) {
- is Action.Stop -> break@loop
- is Action.Write -> {
- stmt.setLong(1, experimentId)
- stmt.setLong(2, record.time)
- stmt.setLong(3, record.duration)
- stmt.setLong(4, record.requestedBurst)
- stmt.setLong(5, record.grantedBurst)
- stmt.setLong(6, record.overcommissionedBurst)
- stmt.setLong(7, record.interferedBurst)
- stmt.setDouble(8, record.cpuUsage)
- stmt.setDouble(9, record.cpuDemand)
- stmt.setInt(10, record.numberOfDeployedImages)
- stmt.setString(11, record.hostServer.uid.toString())
- stmt.setString(12, record.hostServer.state.name)
- stmt.setDouble(13, record.hostUsage)
- stmt.setDouble(14, record.powerDraw)
- stmt.setLong(15, record.submittedVms)
- stmt.setLong(16, record.queuedVms)
- stmt.setLong(17, record.runningVms)
- stmt.setLong(18, record.finishedVms)
- stmt.addBatch()
- batch++
-
- if (batch % batchSize == 0) {
- stmt.executeBatch()
- conn.commit()
- }
- }
- }
- }
- } finally {
- conn.commit()
- stmt.close()
- conn.close()
- }
- }
override suspend fun reportVmStateChange(server: Server) {}
@@ -151,56 +91,24 @@ class ExperimentPostgresReporter(val conn: Connection, val experimentId: Long) :
) {
// Assume for now that the host is not virtualized and measure the current power draw
val driver = hostServer.services[BareMetalDriver.Key]
- val usage = driver.usage.first()
val powerDraw = driver.powerDraw.first()
- queue.put(
- Action.Write(
+ writer.write(
+ scenario, run, HostMetrics(
time,
duration,
+ hostServer,
+ numberOfDeployedImages,
requestedBurst,
grantedBurst,
overcommissionedBurst,
interferedBurst,
cpuUsage,
cpuDemand,
- numberOfDeployedImages,
- hostServer,
- usage,
- powerDraw,
- submittedVms,
- queuedVms,
- runningVms,
- finishedVms
+ powerDraw
)
)
}
- override fun close() {
- queue.put(Action.Stop)
- writerThread.join()
- }
-
- private sealed class Action {
- object Stop : Action()
-
- data class Write(
- val time: Long,
- val duration: Long,
- val requestedBurst: Long,
- val grantedBurst: Long,
- val overcommissionedBurst: Long,
- val interferedBurst: Long,
- val cpuUsage: Double,
- val cpuDemand: Double,
- val numberOfDeployedImages: Int,
- val hostServer: Server,
- val hostUsage: Double,
- val powerDraw: Double,
- val submittedVms: Long,
- val queuedVms: Long,
- val runningVms: Long,
- val finishedVms: Long
- ) : Action()
- }
+ override fun close() {}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt
new file mode 100644
index 00000000..55b80e4c
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt
@@ -0,0 +1,57 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.reporter
+
+import java.sql.Connection
+import java.sql.PreparedStatement
+import java.sql.Timestamp
+import javax.sql.DataSource
+
+/**
+ * A [PostgresMetricsWriter] for persisting [HostMetrics].
+ */
+public class PostgresHostMetricsWriter(ds: DataSource, batchSize: Int) :
+ PostgresMetricsWriter<HostMetrics>(ds, batchSize) {
+ override fun createStatement(conn: Connection): PreparedStatement {
+ return conn.prepareStatement("INSERT INTO host_metrics (scenario_id, run_id, host_id, state, timestamp, duration, vm_count, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, power_draw) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
+ }
+
+ override fun persist(action: Action.Write<HostMetrics>, stmt: PreparedStatement) {
+ stmt.setLong(1, action.scenario)
+ stmt.setInt(2, action.run)
+ stmt.setString(3, action.metrics.host.name)
+ stmt.setString(4, action.metrics.host.state.name)
+ stmt.setTimestamp(5, Timestamp(action.metrics.time))
+ stmt.setLong(6, action.metrics.duration)
+ stmt.setInt(7, action.metrics.vmCount)
+ stmt.setLong(8, action.metrics.requestedBurst)
+ stmt.setLong(9, action.metrics.grantedBurst)
+ stmt.setLong(10, action.metrics.overcommissionedBurst)
+ stmt.setLong(11, action.metrics.interferedBurst)
+ stmt.setDouble(12, action.metrics.cpuUsage)
+ stmt.setDouble(13, action.metrics.cpuDemand)
+ stmt.setDouble(14, action.metrics.powerDraw)
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
new file mode 100644
index 00000000..a30dee05
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
@@ -0,0 +1,135 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.reporter
+
+import java.io.Closeable
+import java.sql.Connection
+import java.sql.PreparedStatement
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import javax.sql.DataSource
+import kotlin.concurrent.thread
+
+/**
+ * The experiment writer is a separate thread that is responsible for writing the results to the
+ * database.
+ */
+public abstract class PostgresMetricsWriter<T>(
+ private val ds: DataSource,
+ private val batchSize: Int = 4096
+) : Runnable, Closeable {
+ /**
+ * The queue of commands to process.
+ */
+ private val queue: BlockingQueue<Action> = ArrayBlockingQueue(batchSize)
+
+ /**
+ * The thread for the actual writer.
+ */
+ private val writerThread: Thread = thread { run() }
+
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ public fun write(scenario: Long, run: Int, metrics: T) {
+ queue.put(Action.Write(scenario, run, metrics))
+ }
+
+ /**
+ * Signal the writer to stop.
+ */
+ public override fun close() {
+ queue.put(Action.Stop)
+ writerThread.join()
+ }
+
+ /**
+ * Create a prepared statement to use.
+ */
+ public abstract fun createStatement(conn: Connection): PreparedStatement
+
+ /**
+ * Persist the specified metrics using the given [stmt].
+ */
+ public abstract fun persist(action: Action.Write<T>, stmt: PreparedStatement)
+
+ /**
+ * Start the writer thread.
+ */
+ override fun run() {
+ val conn = ds.connection
+ var batch = 0
+
+ conn.autoCommit = false
+ val stmt = createStatement(conn)
+
+ try {
+ val actions = mutableListOf<Action>()
+
+ loop@ while (true) {
+ actions.clear()
+
+ if (queue.isEmpty()) {
+ actions.add(queue.take())
+ } else {
+ queue.drainTo(actions)
+ }
+
+ for (action in actions) {
+ when (action) {
+ is Action.Stop -> break@loop
+ is Action.Write<*> -> {
+ @Suppress("UNCHECKED_CAST")
+ persist(action as Action.Write<T>, stmt)
+ stmt.addBatch()
+ batch++
+
+ if (batch % batchSize == 0) {
+ stmt.executeBatch()
+ conn.commit()
+ }
+ }
+ }
+ }
+ }
+ } finally {
+ conn.commit()
+ conn.close()
+ }
+ }
+
+ sealed class Action {
+ /**
+ * A poison pill that will stop the writer thread.
+ */
+ object Stop : Action()
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ data class Write<out T>(val scenario: Long, val run: Int, val metrics: T) : Action()
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt
new file mode 100644
index 00000000..966662cd
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt
@@ -0,0 +1,39 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.reporter
+
+/**
+ * A periodic report of the provisioner's metrics.
+ */
+data class ProvisionerMetrics(
+ val time: Long,
+ val totalHostCount: Int,
+ val availableHostCount: Int,
+ val totalVmCount: Int,
+ val activeVmCount: Int,
+ val inactiveVmCount: Int,
+ val waitingVmCount: Int,
+ val failedVmCount: Int
+)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt
new file mode 100644
index 00000000..5f963206
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt
@@ -0,0 +1,43 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.reporter
+
+import com.atlarge.opendc.compute.core.Server
+
+/**
+ * A periodic report of a virtual machine's metrics.
+ */
+data class VmMetrics(
+ val time: Long,
+ val duration: Long,
+ val vm: Server,
+ val host: Server,
+ val requestedBurst: Long,
+ val grantedBurst: Long,
+ val overcommissionedBurst: Long,
+ val interferedBurst: Long,
+ val cpuUsage: Double,
+ val cpuDemand: Double
+)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
index 8a204ca3..aa06ce65 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
@@ -53,13 +53,13 @@ import kotlin.random.Random
private val logger = KotlinLogging.logger {}
/**
- * A [TraceReader] for the internal VM workload trace format.
+ * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
*
* @param traceFile The directory of the traces.
* @param performanceInterferenceModel The performance model covering the workload in the VM trace.
*/
@OptIn(ExperimentalStdlibApi::class)
-class Sc20ParquetTraceReader(
+class Sc20StreamingParquetTraceReader(
traceFile: File,
performanceInterferenceModel: PerformanceInterferenceModel,
selectedVms: List<String>,