summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-sc20/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-14 15:51:19 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-14 15:51:19 +0200
commitbdf5982ec9977fd949efe7947a4ca36bba4dda85 (patch)
treea75006e93a38d986447939bef9466bb798ffba0f /opendc/opendc-experiments-sc20/src
parent9a09573d1b039de999a7f225fa1b1021deb8f9b2 (diff)
feat: Add parquet reporter
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt1
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt9
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt116
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt73
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt114
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt65
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt42
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt6
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt34
9 files changed, 339 insertions, 121 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
index 03995160..e6fd504e 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
@@ -39,7 +39,6 @@ import java.io.Closeable
import java.io.File
import java.util.concurrent.ExecutorCompletionService
import java.util.concurrent.Executors
-import java.util.concurrent.Future
import javax.sql.DataSource
/**
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
index 0d60ce8c..631c1085 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
@@ -28,6 +28,8 @@ import com.atlarge.opendc.experiments.sc20.reporter.ExperimentParquetReporter
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentPostgresReporter
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider
+import com.atlarge.opendc.experiments.sc20.reporter.ParquetHostMetricsWriter
+import com.atlarge.opendc.experiments.sc20.reporter.ParquetProvisionerMetricsWriter
import com.atlarge.opendc.experiments.sc20.reporter.PostgresHostMetricsWriter
import com.atlarge.opendc.experiments.sc20.reporter.PostgresProvisionerMetricsWriter
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
@@ -182,8 +184,11 @@ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentRepo
.file()
.defaultLazy { File("data") }
- override fun createReporter(scenario: Long, run: Int): ExperimentReporter =
- ExperimentParquetReporter(File(path, "results-$scenario-$run.parquet"))
+ override fun createReporter(scenario: Long, run: Int): ExperimentReporter {
+ val hostWriter = ParquetHostMetricsWriter(File(path, "$scenario-$run-host.parquet"), bufferSize)
+ val provisionerWriter = ParquetProvisionerMetricsWriter(File(path, "$scenario-$run-provisioner.parquet"), bufferSize)
+ return ExperimentParquetReporter(scenario, run, hostWriter, provisionerWriter)
+ }
override fun close() {}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt
index 87bf524f..9426933e 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt
@@ -27,62 +27,19 @@ package com.atlarge.opendc.experiments.sc20.reporter
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
import mu.KotlinLogging
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import java.io.File
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
private val logger = KotlinLogging.logger {}
-class ExperimentParquetReporter(destination: File) :
+class ExperimentParquetReporter(
+ val scenario: Long,
+ val run: Int,
+ val hostWriter: ParquetHostMetricsWriter,
+ val provisionerWriter: ParquetProvisionerMetricsWriter
+) :
ExperimentReporter {
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
- private val schema = SchemaBuilder
- .record("slice")
- .namespace("com.atlarge.opendc.experiments.sc20")
- .fields()
- .name("time").type().longType().noDefault()
- .name("duration").type().longType().noDefault()
- .name("requested_burst").type().longType().noDefault()
- .name("granted_burst").type().longType().noDefault()
- .name("overcommissioned_burst").type().longType().noDefault()
- .name("interfered_burst").type().longType().noDefault()
- .name("cpu_usage").type().doubleType().noDefault()
- .name("cpu_demand").type().doubleType().noDefault()
- .name("image_count").type().intType().noDefault()
- .name("server").type().stringType().noDefault()
- .name("host_state").type().stringType().noDefault()
- .name("host_usage").type().doubleType().noDefault()
- .name("power_draw").type().doubleType().noDefault()
- .name("total_submitted_vms").type().longType().noDefault()
- .name("total_queued_vms").type().longType().noDefault()
- .name("total_running_vms").type().longType().noDefault()
- .name("total_finished_vms").type().longType().noDefault()
- .endRecord()
- private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination.absolutePath))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
- private val queue = ArrayBlockingQueue<GenericData.Record>(2048)
- private val writerThread = thread(start = true, name = "sc20-writer") {
- try {
- while (true) {
- val record = queue.take()
- writer.write(record)
- }
- } catch (e: InterruptedException) {
- // Do not rethrow this
- } finally {
- writer.close()
- }
- }
override fun reportVmStateChange(time: Long, server: Server) {}
@@ -91,7 +48,7 @@ class ExperimentParquetReporter(destination: File) :
driver: VirtDriver,
server: Server
) {
- logger.info("Host ${server.uid} changed state ${server.state} [$time]")
+ logger.debug("Host ${server.uid} changed state ${server.state} [$time]")
val lastServerState = lastServerStates[server]
if (server.state == ServerState.SHUTOFF && lastServerState != null) {
@@ -134,33 +91,42 @@ class ExperimentParquetReporter(destination: File) :
hostServer: Server,
duration: Long
) {
- val record = GenericData.Record(schema)
- record.put("time", time)
- record.put("duration", duration)
- record.put("requested_burst", requestedBurst)
- record.put("granted_burst", grantedBurst)
- record.put("overcommissioned_burst", overcommissionedBurst)
- record.put("interfered_burst", interferedBurst)
- record.put("cpu_usage", cpuUsage)
- record.put("cpu_demand", cpuDemand)
- record.put("image_count", numberOfDeployedImages)
- record.put("server", hostServer.uid)
- record.put("host_state", hostServer.state)
- record.put("host_usage", cpuUsage)
- record.put("power_draw", lastPowerConsumption[hostServer] ?: 200.0)
- record.put("total_submitted_vms", -1)
- record.put("total_queued_vms", -1)
- record.put("total_running_vms", -1)
- record.put("total_finished_vms", -1)
+ hostWriter.write(
+ scenario, run, HostMetrics(
+ time,
+ duration,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0
+ )
+ )
+ }
- queue.put(record)
+ override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {
+ provisionerWriter.write(
+ scenario,
+ run,
+ ProvisionerMetrics(
+ time,
+ event.totalHostCount,
+ event.availableHostCount,
+ event.totalVmCount,
+ event.activeVmCount,
+ event.inactiveVmCount,
+ event.waitingVmCount,
+ event.failedVmCount
+ )
+ )
}
override fun close() {
- // Busy loop to wait for writer thread to finish
- while (queue.isNotEmpty()) {
- Thread.sleep(500)
- }
- writerThread.interrupt()
+ hostWriter.close()
+ provisionerWriter.close()
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt
new file mode 100644
index 00000000..33839191
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt
@@ -0,0 +1,73 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.reporter
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import java.io.File
+
+private val schema: Schema = SchemaBuilder
+ .record("host_metrics")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ .name("scenario_id").type().longType().noDefault()
+ .name("run_id").type().intType().noDefault()
+ .name("timestamp").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("host_id").type().stringType().noDefault()
+ .name("state").type().stringType().noDefault()
+ .name("vm_count").type().intType().noDefault()
+ .name("requested_burst").type().longType().noDefault()
+ .name("granted_burst").type().longType().noDefault()
+ .name("overcommissioned_burst").type().longType().noDefault()
+ .name("interfered_burst").type().longType().noDefault()
+ .name("cpu_usage").type().doubleType().noDefault()
+ .name("cpu_demand").type().doubleType().noDefault()
+ .name("power_draw").type().doubleType().noDefault()
+ .endRecord()
+
+public class ParquetHostMetricsWriter(path: File, batchSize: Int) :
+ ParquetMetricsWriter<HostMetrics>(path, schema, batchSize) {
+
+ override fun persist(action: Action.Write<HostMetrics>, row: GenericData.Record) {
+ row.put("scenario_id", action.scenario)
+ row.put("run_id", action.run)
+ row.put("host_id", action.metrics.host.name)
+ row.put("state", action.metrics.host.state.name)
+ row.put("timestamp", action.metrics.time)
+ row.put("duration", action.metrics.duration)
+ row.put("vm_count", action.metrics.vmCount)
+ row.put("requested_burst", action.metrics.requestedBurst)
+ row.put("granted_burst", action.metrics.grantedBurst)
+ row.put("overcommissioned_burst", action.metrics.overcommissionedBurst)
+ row.put("interfered_burst", action.metrics.interferedBurst)
+ row.put("cpu_usage", action.metrics.cpuUsage)
+ row.put("cpu_demand", action.metrics.cpuDemand)
+ row.put("power_draw", action.metrics.powerDraw)
+ }
+
+ override fun toString(): String = "host-writer"
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt
new file mode 100644
index 00000000..e82e9e47
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt
@@ -0,0 +1,114 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.reporter
+
+import mu.KotlinLogging
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.io.Closeable
+import java.io.File
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import kotlin.concurrent.thread
+
+private val logger = KotlinLogging.logger {}
+
+public abstract class ParquetMetricsWriter<T>(
+ private val path: File,
+ private val schema: Schema,
+ private val bufferSize: Int = 4096
+) : Runnable, Closeable {
+ /**
+ * The queue of commands to process.
+ */
+ private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize)
+ private val writerThread = thread(start = true, name = "parquet-writer") { run() }
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ public fun write(scenario: Long, run: Int, metrics: T) {
+ queue.put(Action.Write(scenario, run, metrics))
+ }
+
+ /**
+ * Signal the writer to stop.
+ */
+ public override fun close() {
+ queue.put(Action.Stop)
+ writerThread.join()
+ }
+
+ /**
+ * Persist the specified metrics to the given [row].
+ */
+ public abstract fun persist(action: Action.Write<T>, row: GenericData.Record)
+
+ /**
+ * Start the writer thread.
+ */
+ override fun run() {
+ val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ try {
+ loop@ while (true) {
+ val action = queue.take()
+ when (action) {
+ is Action.Stop -> break@loop
+ is Action.Write<*> -> {
+ val record = GenericData.Record(schema)
+ @Suppress("UNCHECKED_CAST")
+ persist(action as Action.Write<T>, record)
+ writer.write(record)
+ }
+ }
+ }
+ } catch (e: Throwable) {
+ logger.error("Writer failed", e)
+ } finally {
+ writer.close()
+ }
+ }
+
+ sealed class Action {
+ /**
+ * A poison pill that will stop the writer thread.
+ */
+ object Stop : Action()
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ data class Write<out T>(val scenario: Long, val run: Int, val metrics: T) : Action()
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt
new file mode 100644
index 00000000..0c74b23e
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt
@@ -0,0 +1,65 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.reporter
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import java.io.File
+
+private val schema: Schema = SchemaBuilder
+ .record("host_metrics")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ .name("scenario_id").type().longType().noDefault()
+ .name("run_id").type().intType().noDefault()
+ .name("timestamp").type().longType().noDefault()
+ .name("host_total_count").type().intType().noDefault()
+ .name("host_available_count").type().intType().noDefault()
+ .name("vm_total_count").type().intType().noDefault()
+ .name("vm_active_count").type().intType().noDefault()
+ .name("vm_inactive_count").type().intType().noDefault()
+ .name("vm_waiting_count").type().intType().noDefault()
+ .name("vm_failed_count").type().intType().noDefault()
+ .endRecord()
+
+public class ParquetProvisionerMetricsWriter(path: File, batchSize: Int) :
+ ParquetMetricsWriter<ProvisionerMetrics>(path, schema, batchSize) {
+
+ override fun persist(action: Action.Write<ProvisionerMetrics>, row: GenericData.Record) {
+ row.put("scenario_id", action.scenario)
+ row.put("run_id", action.run)
+ row.put("timestamp", action.metrics.time)
+ row.put("host_total_count", action.metrics.totalHostCount)
+ row.put("host_available_count", action.metrics.availableHostCount)
+ row.put("vm_total_count", action.metrics.totalVmCount)
+ row.put("vm_active_count", action.metrics.activeVmCount)
+ row.put("vm_inactive_count", action.metrics.inactiveVmCount)
+ row.put("vm_waiting_count", action.metrics.waitingVmCount)
+ row.put("vm_failed_count", action.metrics.failedVmCount)
+ }
+
+ override fun toString(): String = "host-writer"
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt
index 43e4a7a6..57e665ae 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt
@@ -28,31 +28,31 @@ import de.bytefish.pgbulkinsert.row.SimpleRow
import de.bytefish.pgbulkinsert.row.SimpleRowWriter
import javax.sql.DataSource
+private val table: SimpleRowWriter.Table = SimpleRowWriter.Table(
+ "host_metrics",
+ *arrayOf(
+ "scenario_id",
+ "run_id",
+ "host_id",
+ "state",
+ "timestamp",
+ "duration",
+ "vm_count",
+ "requested_burst",
+ "granted_burst",
+ "overcommissioned_burst",
+ "interfered_burst",
+ "cpu_usage",
+ "cpu_demand",
+ "power_draw"
+ )
+)
+
/**
* A [PostgresMetricsWriter] for persisting [HostMetrics].
*/
public class PostgresHostMetricsWriter(ds: DataSource, parallelism: Int, batchSize: Int) :
- PostgresMetricsWriter<HostMetrics>(ds, parallelism, batchSize) {
-
- override val table: SimpleRowWriter.Table = SimpleRowWriter.Table(
- "host_metrics",
- *arrayOf(
- "scenario_id",
- "run_id",
- "host_id",
- "state",
- "timestamp",
- "duration",
- "vm_count",
- "requested_burst",
- "granted_burst",
- "overcommissioned_burst",
- "interfered_burst",
- "cpu_usage",
- "cpu_demand",
- "power_draw"
- )
- )
+ PostgresMetricsWriter<HostMetrics>(ds, table, parallelism, batchSize) {
override fun persist(action: Action.Write<HostMetrics>, row: SimpleRow) {
row.setLong("scenario_id", action.scenario)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
index 715800a3..bee01e51 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
@@ -40,6 +40,7 @@ import javax.sql.DataSource
*/
public abstract class PostgresMetricsWriter<T>(
private val ds: DataSource,
+ private val table: SimpleRowWriter.Table,
private val parallelism: Int = 8,
private val bufferSize: Int = 4096
) : Runnable, Closeable {
@@ -72,11 +73,6 @@ public abstract class PostgresMetricsWriter<T>(
}
/**
- * Create the table to which we write.
- */
- public abstract val table: SimpleRowWriter.Table
-
- /**
* Persist the specified metrics to the given [row].
*/
public abstract fun persist(action: Action.Write<T>, row: SimpleRow)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt
index a7a86206..17788112 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt
@@ -28,27 +28,27 @@ import de.bytefish.pgbulkinsert.row.SimpleRow
import de.bytefish.pgbulkinsert.row.SimpleRowWriter
import javax.sql.DataSource
+private val table: SimpleRowWriter.Table = SimpleRowWriter.Table(
+ "provisioner_metrics",
+ *arrayOf(
+ "scenario_id",
+ "run_id",
+ "timestamp",
+ "host_total_count",
+ "host_available_count",
+ "vm_total_count",
+ "vm_active_count",
+ "vm_inactive_count",
+ "vm_waiting_count",
+ "vm_failed_count"
+ )
+)
+
/**
* A [PostgresMetricsWriter] for persisting [ProvisionerMetrics].
*/
public class PostgresProvisionerMetricsWriter(ds: DataSource, parallelism: Int, batchSize: Int) :
- PostgresMetricsWriter<ProvisionerMetrics>(ds, parallelism, batchSize) {
-
- override val table: SimpleRowWriter.Table = SimpleRowWriter.Table(
- "provisioner_metrics",
- *arrayOf(
- "scenario_id",
- "run_id",
- "timestamp",
- "host_total_count",
- "host_available_count",
- "vm_total_count",
- "vm_active_count",
- "vm_inactive_count",
- "vm_waiting_count",
- "vm_failed_count"
- )
- )
+ PostgresMetricsWriter<ProvisionerMetrics>(ds, table, parallelism, batchSize) {
override fun persist(action: Action.Write<ProvisionerMetrics>, row: SimpleRow) {
row.setLong("scenario_id", action.scenario)