diff options
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
9 files changed, 339 insertions, 121 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt index 03995160..e6fd504e 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt @@ -39,7 +39,6 @@ import java.io.Closeable import java.io.File import java.util.concurrent.ExecutorCompletionService import java.util.concurrent.Executors -import java.util.concurrent.Future import javax.sql.DataSource /** diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt index 0d60ce8c..631c1085 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt @@ -28,6 +28,8 @@ import com.atlarge.opendc.experiments.sc20.reporter.ExperimentParquetReporter import com.atlarge.opendc.experiments.sc20.reporter.ExperimentPostgresReporter import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider +import com.atlarge.opendc.experiments.sc20.reporter.ParquetHostMetricsWriter +import com.atlarge.opendc.experiments.sc20.reporter.ParquetProvisionerMetricsWriter import com.atlarge.opendc.experiments.sc20.reporter.PostgresHostMetricsWriter import com.atlarge.opendc.experiments.sc20.reporter.PostgresProvisionerMetricsWriter import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader @@ -182,8 +184,11 @@ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentRepo .file() .defaultLazy { File("data") } - override fun createReporter(scenario: Long, run: Int): ExperimentReporter = - ExperimentParquetReporter(File(path, "results-$scenario-$run.parquet")) + override fun createReporter(scenario: Long, run: Int): ExperimentReporter { + val hostWriter = ParquetHostMetricsWriter(File(path, "$scenario-$run-host.parquet"), bufferSize) + val provisionerWriter = ParquetProvisionerMetricsWriter(File(path, "$scenario-$run-provisioner.parquet"), bufferSize) + return ExperimentParquetReporter(scenario, run, hostWriter, provisionerWriter) + } override fun close() {} } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt index 87bf524f..9426933e 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt @@ -27,62 +27,19 @@ package com.atlarge.opendc.experiments.sc20.reporter import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent import mu.KotlinLogging -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import java.io.File -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread private val logger = KotlinLogging.logger {} -class ExperimentParquetReporter(destination: File) : +class ExperimentParquetReporter( + val scenario: Long, + val run: Int, + val hostWriter: ParquetHostMetricsWriter, + val provisionerWriter: ParquetProvisionerMetricsWriter +) : ExperimentReporter { private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() - private val schema = SchemaBuilder - .record("slice") - .namespace("com.atlarge.opendc.experiments.sc20") - .fields() - .name("time").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("requested_burst").type().longType().noDefault() - .name("granted_burst").type().longType().noDefault() - .name("overcommissioned_burst").type().longType().noDefault() - .name("interfered_burst").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("image_count").type().intType().noDefault() - .name("server").type().stringType().noDefault() - .name("host_state").type().stringType().noDefault() - .name("host_usage").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .name("total_submitted_vms").type().longType().noDefault() - .name("total_queued_vms").type().longType().noDefault() - .name("total_running_vms").type().longType().noDefault() - .name("total_finished_vms").type().longType().noDefault() - .endRecord() - private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination.absolutePath)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - private val queue = ArrayBlockingQueue<GenericData.Record>(2048) - private val writerThread = thread(start = true, name = "sc20-writer") { - try { - while (true) { - val record = queue.take() - writer.write(record) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - writer.close() - } - } override fun reportVmStateChange(time: Long, server: Server) {} @@ -91,7 +48,7 @@ class ExperimentParquetReporter(destination: File) : driver: VirtDriver, server: Server ) { - logger.info("Host ${server.uid} changed state ${server.state} [$time]") + logger.debug("Host ${server.uid} changed state ${server.state} [$time]") val lastServerState = lastServerStates[server] if (server.state == ServerState.SHUTOFF && lastServerState != null) { @@ -134,33 +91,42 @@ class ExperimentParquetReporter(destination: File) : hostServer: Server, duration: Long ) { - val record = GenericData.Record(schema) - record.put("time", time) - record.put("duration", duration) - record.put("requested_burst", requestedBurst) - record.put("granted_burst", grantedBurst) - record.put("overcommissioned_burst", overcommissionedBurst) - record.put("interfered_burst", interferedBurst) - record.put("cpu_usage", cpuUsage) - record.put("cpu_demand", cpuDemand) - record.put("image_count", numberOfDeployedImages) - record.put("server", hostServer.uid) - record.put("host_state", hostServer.state) - record.put("host_usage", cpuUsage) - record.put("power_draw", lastPowerConsumption[hostServer] ?: 200.0) - record.put("total_submitted_vms", -1) - record.put("total_queued_vms", -1) - record.put("total_running_vms", -1) - record.put("total_finished_vms", -1) + hostWriter.write( + scenario, run, HostMetrics( + time, + duration, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0 + ) + ) + } - queue.put(record) + override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + provisionerWriter.write( + scenario, + run, + ProvisionerMetrics( + time, + event.totalHostCount, + event.availableHostCount, + event.totalVmCount, + event.activeVmCount, + event.inactiveVmCount, + event.waitingVmCount, + event.failedVmCount + ) + ) } override fun close() { - // Busy loop to wait for writer thread to finish - while (queue.isNotEmpty()) { - Thread.sleep(500) - } - writerThread.interrupt() + hostWriter.close() + provisionerWriter.close() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt new file mode 100644 index 00000000..33839191 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt @@ -0,0 +1,73 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import java.io.File + +private val schema: Schema = SchemaBuilder + .record("host_metrics") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("scenario_id").type().longType().noDefault() + .name("run_id").type().intType().noDefault() + .name("timestamp").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("host_id").type().stringType().noDefault() + .name("state").type().stringType().noDefault() + .name("vm_count").type().intType().noDefault() + .name("requested_burst").type().longType().noDefault() + .name("granted_burst").type().longType().noDefault() + .name("overcommissioned_burst").type().longType().noDefault() + .name("interfered_burst").type().longType().noDefault() + .name("cpu_usage").type().doubleType().noDefault() + .name("cpu_demand").type().doubleType().noDefault() + .name("power_draw").type().doubleType().noDefault() + .endRecord() + +public class ParquetHostMetricsWriter(path: File, batchSize: Int) : + ParquetMetricsWriter<HostMetrics>(path, schema, batchSize) { + + override fun persist(action: Action.Write<HostMetrics>, row: GenericData.Record) { + row.put("scenario_id", action.scenario) + row.put("run_id", action.run) + row.put("host_id", action.metrics.host.name) + row.put("state", action.metrics.host.state.name) + row.put("timestamp", action.metrics.time) + row.put("duration", action.metrics.duration) + row.put("vm_count", action.metrics.vmCount) + row.put("requested_burst", action.metrics.requestedBurst) + row.put("granted_burst", action.metrics.grantedBurst) + row.put("overcommissioned_burst", action.metrics.overcommissionedBurst) + row.put("interfered_burst", action.metrics.interferedBurst) + row.put("cpu_usage", action.metrics.cpuUsage) + row.put("cpu_demand", action.metrics.cpuDemand) + row.put("power_draw", action.metrics.powerDraw) + } + + override fun toString(): String = "host-writer" +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt new file mode 100644 index 00000000..e82e9e47 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt @@ -0,0 +1,114 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter + +import mu.KotlinLogging +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.Closeable +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread + +private val logger = KotlinLogging.logger {} + +public abstract class ParquetMetricsWriter<T>( + private val path: File, + private val schema: Schema, + private val bufferSize: Int = 4096 +) : Runnable, Closeable { + /** + * The queue of commands to process. + */ + private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize) + private val writerThread = thread(start = true, name = "parquet-writer") { run() } + + /** + * Write the specified metrics to the database. + */ + public fun write(scenario: Long, run: Int, metrics: T) { + queue.put(Action.Write(scenario, run, metrics)) + } + + /** + * Signal the writer to stop. + */ + public override fun close() { + queue.put(Action.Stop) + writerThread.join() + } + + /** + * Persist the specified metrics to the given [row]. + */ + public abstract fun persist(action: Action.Write<T>, row: GenericData.Record) + + /** + * Start the writer thread. + */ + override fun run() { + val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + try { + loop@ while (true) { + val action = queue.take() + when (action) { + is Action.Stop -> break@loop + is Action.Write<*> -> { + val record = GenericData.Record(schema) + @Suppress("UNCHECKED_CAST") + persist(action as Action.Write<T>, record) + writer.write(record) + } + } + } + } catch (e: Throwable) { + logger.error("Writer failed", e) + } finally { + writer.close() + } + } + + sealed class Action { + /** + * A poison pill that will stop the writer thread. + */ + object Stop : Action() + + /** + * Write the specified metrics to the database. + */ + data class Write<out T>(val scenario: Long, val run: Int, val metrics: T) : Action() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt new file mode 100644 index 00000000..0c74b23e --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt @@ -0,0 +1,65 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import java.io.File + +private val schema: Schema = SchemaBuilder + .record("host_metrics") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("scenario_id").type().longType().noDefault() + .name("run_id").type().intType().noDefault() + .name("timestamp").type().longType().noDefault() + .name("host_total_count").type().intType().noDefault() + .name("host_available_count").type().intType().noDefault() + .name("vm_total_count").type().intType().noDefault() + .name("vm_active_count").type().intType().noDefault() + .name("vm_inactive_count").type().intType().noDefault() + .name("vm_waiting_count").type().intType().noDefault() + .name("vm_failed_count").type().intType().noDefault() + .endRecord() + +public class ParquetProvisionerMetricsWriter(path: File, batchSize: Int) : + ParquetMetricsWriter<ProvisionerMetrics>(path, schema, batchSize) { + + override fun persist(action: Action.Write<ProvisionerMetrics>, row: GenericData.Record) { + row.put("scenario_id", action.scenario) + row.put("run_id", action.run) + row.put("timestamp", action.metrics.time) + row.put("host_total_count", action.metrics.totalHostCount) + row.put("host_available_count", action.metrics.availableHostCount) + row.put("vm_total_count", action.metrics.totalVmCount) + row.put("vm_active_count", action.metrics.activeVmCount) + row.put("vm_inactive_count", action.metrics.inactiveVmCount) + row.put("vm_waiting_count", action.metrics.waitingVmCount) + row.put("vm_failed_count", action.metrics.failedVmCount) + } + + override fun toString(): String = "host-writer" +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt index 43e4a7a6..57e665ae 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt @@ -28,31 +28,31 @@ import de.bytefish.pgbulkinsert.row.SimpleRow import de.bytefish.pgbulkinsert.row.SimpleRowWriter import javax.sql.DataSource +private val table: SimpleRowWriter.Table = SimpleRowWriter.Table( + "host_metrics", + *arrayOf( + "scenario_id", + "run_id", + "host_id", + "state", + "timestamp", + "duration", + "vm_count", + "requested_burst", + "granted_burst", + "overcommissioned_burst", + "interfered_burst", + "cpu_usage", + "cpu_demand", + "power_draw" + ) +) + /** * A [PostgresMetricsWriter] for persisting [HostMetrics]. */ public class PostgresHostMetricsWriter(ds: DataSource, parallelism: Int, batchSize: Int) : - PostgresMetricsWriter<HostMetrics>(ds, parallelism, batchSize) { - - override val table: SimpleRowWriter.Table = SimpleRowWriter.Table( - "host_metrics", - *arrayOf( - "scenario_id", - "run_id", - "host_id", - "state", - "timestamp", - "duration", - "vm_count", - "requested_burst", - "granted_burst", - "overcommissioned_burst", - "interfered_burst", - "cpu_usage", - "cpu_demand", - "power_draw" - ) - ) + PostgresMetricsWriter<HostMetrics>(ds, table, parallelism, batchSize) { override fun persist(action: Action.Write<HostMetrics>, row: SimpleRow) { row.setLong("scenario_id", action.scenario) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt index 715800a3..bee01e51 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt @@ -40,6 +40,7 @@ import javax.sql.DataSource */ public abstract class PostgresMetricsWriter<T>( private val ds: DataSource, + private val table: SimpleRowWriter.Table, private val parallelism: Int = 8, private val bufferSize: Int = 4096 ) : Runnable, Closeable { @@ -72,11 +73,6 @@ public abstract class PostgresMetricsWriter<T>( } /** - * Create the table to which we write. - */ - public abstract val table: SimpleRowWriter.Table - - /** * Persist the specified metrics to the given [row]. */ public abstract fun persist(action: Action.Write<T>, row: SimpleRow) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt index a7a86206..17788112 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt @@ -28,27 +28,27 @@ import de.bytefish.pgbulkinsert.row.SimpleRow import de.bytefish.pgbulkinsert.row.SimpleRowWriter import javax.sql.DataSource +private val table: SimpleRowWriter.Table = SimpleRowWriter.Table( + "provisioner_metrics", + *arrayOf( + "scenario_id", + "run_id", + "timestamp", + "host_total_count", + "host_available_count", + "vm_total_count", + "vm_active_count", + "vm_inactive_count", + "vm_waiting_count", + "vm_failed_count" + ) +) + /** * A [PostgresMetricsWriter] for persisting [ProvisionerMetrics]. */ public class PostgresProvisionerMetricsWriter(ds: DataSource, parallelism: Int, batchSize: Int) : - PostgresMetricsWriter<ProvisionerMetrics>(ds, parallelism, batchSize) { - - override val table: SimpleRowWriter.Table = SimpleRowWriter.Table( - "provisioner_metrics", - *arrayOf( - "scenario_id", - "run_id", - "timestamp", - "host_total_count", - "host_available_count", - "vm_total_count", - "vm_active_count", - "vm_inactive_count", - "vm_waiting_count", - "vm_failed_count" - ) - ) + PostgresMetricsWriter<ProvisionerMetrics>(ds, table, parallelism, batchSize) { override fun persist(action: Action.Write<ProvisionerMetrics>, row: SimpleRow) { row.setLong("scenario_id", action.scenario) |
