summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-14 14:00:33 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-14 14:18:58 +0200
commit6c51f02c38053a8aa395ebeb5b29e2b0a4f30c84 (patch)
tree4d4eff97aa6c460b0eed1ec91059a7efbcca2f45 /opendc
parent189001983350cbcc7f3524ea5983df48c873709b (diff)
perf: Use PostgreSQL bulk data inserter
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt1
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt3
-rw-r--r--opendc/opendc-experiments-sc20/build.gradle.kts3
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt1
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt45
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt47
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt1
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt2
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt62
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt77
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt50
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt1
12 files changed, 154 insertions, 139 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index c25834a7..c3d9c745 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -257,7 +257,6 @@ class SimpleVirtProvisioningService(
if (server in hypervisors) {
// Corner case for when the hypervisor already exists
availableHypervisors += hypervisors.getValue(server)
-
} else {
val hv = HypervisorView(
server.uid,
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt
index 39f75913..c3fb99f9 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt
@@ -24,9 +24,6 @@
package com.atlarge.opendc.compute.virt.service
-import com.atlarge.opendc.compute.virt.driver.VirtDriver
-
-
/**
* An event that is emitted by the [VirtProvisioningService].
*/
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts
index b7440792..2ba07554 100644
--- a/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -47,6 +47,9 @@ dependencies {
exclude(group = "log4j")
}
implementation("com.zaxxer:HikariCP:3.4.5")
+ implementation("de.bytefish.pgbulkinsert:pgbulkinsert-core:5.1.0")
+ implementation("de.bytefish.pgbulkinsert:pgbulkinsert-rowwriter:5.1.0")
+ implementation("me.tongfei:progressbar:0.8.1")
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1")
runtimeOnly("org.postgresql:postgresql:42.2.12")
runtimeOnly(project(":odcsim:odcsim-engine-omega"))
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
index 548400d6..2c41dd7b 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
@@ -192,7 +192,6 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex
is VirtProvisioningEvent.MetricsAvailable ->
reporter.reportProvisionerMetrics(clock.millis(), event)
}
-
}
.launchIn(domain)
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
index 7d65930c..728fc62d 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
@@ -33,14 +33,13 @@ import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
+import me.tongfei.progressbar.ProgressBar
import mu.KotlinLogging
import java.io.Closeable
import java.io.File
import java.util.concurrent.Executors
import java.util.concurrent.Future
-import java.util.concurrent.atomic.AtomicInteger
import javax.sql.DataSource
-import kotlin.system.measureTimeMillis
/**
* The logger for the experiment runner.
@@ -154,10 +153,10 @@ public class ExperimentRunner(
val plan = createPlan()
val total = plan.size
- val finished = AtomicInteger()
val executorService = Executors.newCachedThreadPool()
val planIterator = plan.iterator()
val futures = mutableListOf<Future<*>>()
+ val pb = ProgressBar("Experiment", total.toLong())
while (planIterator.hasNext()) {
futures.clear()
@@ -176,38 +175,36 @@ public class ExperimentRunner(
}
val future = executorService.submit {
+ pb.extraMessage = "($scenarioId, ${run.id}) START"
+
+ var hasFailed = false
synchronized(helper) {
helper.startRun(scenarioId, run.id)
}
- logger.info { "[${finished.get()}/$total] Starting run ($scenarioId, ${run.id})" }
-
try {
- val duration = measureTimeMillis {
- val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id)
- val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run)
- val environmentReader = createEnvironmentReader(run.scenario.topology.name)
-
- try {
- run.scenario(run, reporter, environmentReader, traceReader)
- logger.info { "Done" }
- } finally {
- reporter.close()
- }
+ val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id)
+ val traceReader =
+ createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run)
+ val environmentReader = createEnvironmentReader(run.scenario.topology.name)
+
+ try {
+ run.scenario(run, reporter, environmentReader, traceReader)
+ } finally {
+ reporter.close()
}
- finished.incrementAndGet()
- logger.info { "[${finished.get()}/$total] Finished run ($scenarioId, ${run.id}) in $duration milliseconds" }
-
- synchronized(helper) {
- helper.finishRun(scenarioId, run.id, hasFailed = false)
- }
+ pb.extraMessage = "($scenarioId, ${run.id}) OK"
} catch (e: Throwable) {
logger.error("A run has failed", e)
- finished.incrementAndGet()
+ hasFailed = true
+ pb.extraMessage = "($scenarioId, ${run.id}) FAIL"
+ } finally {
synchronized(helper) {
- helper.finishRun(scenarioId, run.id, hasFailed = true)
+ helper.finishRun(scenarioId, run.id, hasFailed = hasFailed)
}
+
+ pb.step()
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
index 9cbfcdc1..cf805286 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
@@ -118,16 +118,23 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
.multiple()
/**
- * The maximum number of threads to use.
+ * The maximum number of worker threads to use.
*/
- private val parallelism by option("--parallelism")
+ private val workerParallelism by option("--worker-parallelism")
.int()
.default(Runtime.getRuntime().availableProcessors())
/**
- * The batch size for writing results.
+ * The maximum number of writer threads to use.
*/
- private val batchSize by option("--batch-size")
+ private val writerParallelism by option("--writer-parallelism")
+ .int()
+ .default(8)
+
+ /**
+ * The buffer size for writing results.
+ */
+ private val bufferSize by option("--buffer-size")
.int()
.default(4096)
@@ -137,13 +144,13 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
ds.jdbcUrl = jdbcUrl
ds.addDataSourceProperty("reWriteBatchedInserts", "true")
-
- reporter.batchSize = batchSize
+ reporter.bufferSize = bufferSize
+ reporter.parallelism = writerParallelism
val performanceInterferenceModel =
- performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it).construct() }
+ performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it).construct() }
- val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel, parallelism)
+ val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel, workerParallelism)
try {
runner.run()
@@ -158,7 +165,8 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
* An option for specifying the type of reporter to use.
*/
internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentReporterProvider {
- var batchSize = 4096
+ var bufferSize = 4096
+ var parallelism = 8
class Parquet : Reporter("Options for reporting using Parquet") {
private val path by option("--parquet-directory", help = "path to where the output should be stored")
@@ -172,25 +180,22 @@ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentRepo
}
class Postgres : Reporter("Options for reporting using PostgreSQL") {
- lateinit var ds: DataSource
+ lateinit var hostWriter: PostgresHostMetricsWriter
+ lateinit var provisionerWriter: PostgresProvisionerMetricsWriter
override fun init(ds: DataSource) {
- this.ds = ds
+ hostWriter = PostgresHostMetricsWriter(ds, parallelism, bufferSize)
+ provisionerWriter = PostgresProvisionerMetricsWriter(ds, parallelism, bufferSize)
}
override fun createReporter(scenario: Long, run: Int): ExperimentReporter {
- val hostWriter = PostgresHostMetricsWriter(ds, batchSize)
- val provisionerWriter = PostgresProvisionerMetricsWriter(ds, batchSize)
- val delegate = ExperimentPostgresReporter(scenario, run, hostWriter, provisionerWriter)
- return object : ExperimentReporter by delegate {
- override fun close() {
- hostWriter.close()
- provisionerWriter.close()
- }
- }
+ return ExperimentPostgresReporter(scenario, run, hostWriter, provisionerWriter)
}
- override fun close() {}
+ override fun close() {
+ hostWriter.close()
+ provisionerWriter.close()
+ }
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt
index bb3466ba..99634e1b 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt
@@ -64,6 +64,5 @@ fun sampleRegularWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<T
logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
return res
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
index 6f220640..595b9777 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
@@ -72,14 +72,12 @@ class ExperimentPostgresReporter(
}
}
-
private val lastPowerConsumption = mutableMapOf<Server, Double>()
override fun reportPowerConsumption(host: Server, draw: Double) {
lastPowerConsumption[host] = draw
}
-
override fun reportHostSlice(
time: Long,
requestedBurst: Long,
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt
index 5eb55f20..43e4a7a6 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt
@@ -24,35 +24,51 @@
package com.atlarge.opendc.experiments.sc20.reporter
-import java.sql.Connection
-import java.sql.PreparedStatement
-import java.sql.Timestamp
+import de.bytefish.pgbulkinsert.row.SimpleRow
+import de.bytefish.pgbulkinsert.row.SimpleRowWriter
import javax.sql.DataSource
/**
* A [PostgresMetricsWriter] for persisting [HostMetrics].
*/
-public class PostgresHostMetricsWriter(ds: DataSource, batchSize: Int) :
- PostgresMetricsWriter<HostMetrics>(ds, batchSize) {
- override fun createStatement(conn: Connection): PreparedStatement {
- return conn.prepareStatement("INSERT INTO host_metrics (scenario_id, run_id, host_id, state, timestamp, duration, vm_count, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, power_draw) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
- }
+public class PostgresHostMetricsWriter(ds: DataSource, parallelism: Int, batchSize: Int) :
+ PostgresMetricsWriter<HostMetrics>(ds, parallelism, batchSize) {
+
+ override val table: SimpleRowWriter.Table = SimpleRowWriter.Table(
+ "host_metrics",
+ *arrayOf(
+ "scenario_id",
+ "run_id",
+ "host_id",
+ "state",
+ "timestamp",
+ "duration",
+ "vm_count",
+ "requested_burst",
+ "granted_burst",
+ "overcommissioned_burst",
+ "interfered_burst",
+ "cpu_usage",
+ "cpu_demand",
+ "power_draw"
+ )
+ )
- override fun persist(action: Action.Write<HostMetrics>, stmt: PreparedStatement) {
- stmt.setLong(1, action.scenario)
- stmt.setInt(2, action.run)
- stmt.setString(3, action.metrics.host.name)
- stmt.setString(4, action.metrics.host.state.name)
- stmt.setTimestamp(5, Timestamp(action.metrics.time))
- stmt.setLong(6, action.metrics.duration)
- stmt.setInt(7, action.metrics.vmCount)
- stmt.setLong(8, action.metrics.requestedBurst)
- stmt.setLong(9, action.metrics.grantedBurst)
- stmt.setLong(10, action.metrics.overcommissionedBurst)
- stmt.setLong(11, action.metrics.interferedBurst)
- stmt.setDouble(12, action.metrics.cpuUsage)
- stmt.setDouble(13, action.metrics.cpuDemand)
- stmt.setDouble(14, action.metrics.powerDraw)
+ override fun persist(action: Action.Write<HostMetrics>, row: SimpleRow) {
+ row.setLong("scenario_id", action.scenario)
+ row.setInteger("run_id", action.run)
+ row.setText("host_id", action.metrics.host.name)
+ row.setText("state", action.metrics.host.state.name)
+ row.setLong("timestamp", action.metrics.time)
+ row.setLong("duration", action.metrics.duration)
+ row.setInteger("vm_count", action.metrics.vmCount)
+ row.setLong("requested_burst", action.metrics.requestedBurst)
+ row.setLong("granted_burst", action.metrics.grantedBurst)
+ row.setLong("overcommissioned_burst", action.metrics.overcommissionedBurst)
+ row.setLong("interfered_burst", action.metrics.interferedBurst)
+ row.setDouble("cpu_usage", action.metrics.cpuUsage)
+ row.setDouble("cpu_demand", action.metrics.cpuDemand)
+ row.setDouble("power_draw", action.metrics.powerDraw)
}
override fun toString(): String = "host-writer"
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
index 33c2d40e..715800a3 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
@@ -24,13 +24,15 @@
package com.atlarge.opendc.experiments.sc20.reporter
+import de.bytefish.pgbulkinsert.row.SimpleRow
+import de.bytefish.pgbulkinsert.row.SimpleRowWriter
+import org.postgresql.PGConnection
import java.io.Closeable
-import java.sql.Connection
-import java.sql.PreparedStatement
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.BlockingQueue
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
import javax.sql.DataSource
-import kotlin.concurrent.thread
/**
* The experiment writer is a separate thread that is responsible for writing the results to the
@@ -38,17 +40,18 @@ import kotlin.concurrent.thread
*/
public abstract class PostgresMetricsWriter<T>(
private val ds: DataSource,
- private val batchSize: Int = 4096
+ private val parallelism: Int = 8,
+ private val bufferSize: Int = 4096
) : Runnable, Closeable {
/**
* The queue of commands to process.
*/
- private val queue: BlockingQueue<Action> = ArrayBlockingQueue(4 * batchSize)
+ private val queue: BlockingQueue<Action> = ArrayBlockingQueue(parallelism * bufferSize)
/**
- * The thread for the actual writer.
+ * The executor service to use.
*/
- private val writerThread: Thread = thread(name = "metrics-writer") { run() }
+ private val executorService = Executors.newFixedThreadPool(parallelism)
/**
* Write the specified metrics to the database.
@@ -61,64 +64,52 @@ public abstract class PostgresMetricsWriter<T>(
* Signal the writer to stop.
*/
public override fun close() {
- queue.put(Action.Stop)
- writerThread.join()
+ repeat(parallelism) {
+ queue.put(Action.Stop)
+ }
+ executorService.shutdown()
+ executorService.awaitTermination(5, TimeUnit.MINUTES)
}
/**
- * Create a prepared statement to use.
+ * Create the table to which we write.
*/
- public abstract fun createStatement(conn: Connection): PreparedStatement
+ public abstract val table: SimpleRowWriter.Table
/**
- * Persist the specified metrics using the given [stmt].
+ * Persist the specified metrics to the given [row].
*/
- public abstract fun persist(action: Action.Write<T>, stmt: PreparedStatement)
+ public abstract fun persist(action: Action.Write<T>, row: SimpleRow)
+
+ init {
+ repeat(parallelism) {
+ executorService.submit { run() }
+ }
+ }
/**
* Start the writer thread.
*/
override fun run() {
- writerThread.name = toString()
val conn = ds.connection
- var batch = 0
- conn.autoCommit = false
- val stmt = createStatement(conn)
+ val writer = SimpleRowWriter(table)
+ writer.open(conn.unwrap(PGConnection::class.java))
try {
- val actions = mutableListOf<Action>()
loop@ while (true) {
- actions.clear()
-
- if (queue.isEmpty()) {
- actions.add(queue.take())
- }
- queue.drainTo(actions)
-
- for (action in actions) {
- when (action) {
- is Action.Stop -> break@loop
- is Action.Write<*> -> {
- @Suppress("UNCHECKED_CAST")
- persist(action as Action.Write<T>, stmt)
- stmt.addBatch()
- batch++
-
- if (batch % batchSize == 0) {
- stmt.executeBatch()
- conn.commit()
- }
-
- }
+ val action = queue.take()
+ when (action) {
+ is Action.Stop -> break@loop
+ is Action.Write<*> -> writer.startRow {
+ @Suppress("UNCHECKED_CAST")
+ persist(action as Action.Write<T>, it)
}
}
}
-
} finally {
- stmt.executeBatch()
- conn.commit()
+ writer.close()
conn.close()
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt
index 7bc88959..a7a86206 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt
@@ -24,31 +24,43 @@
package com.atlarge.opendc.experiments.sc20.reporter
-import java.sql.Connection
-import java.sql.PreparedStatement
-import java.sql.Timestamp
+import de.bytefish.pgbulkinsert.row.SimpleRow
+import de.bytefish.pgbulkinsert.row.SimpleRowWriter
import javax.sql.DataSource
/**
* A [PostgresMetricsWriter] for persisting [ProvisionerMetrics].
*/
-public class PostgresProvisionerMetricsWriter(ds: DataSource, batchSize: Int) :
- PostgresMetricsWriter<ProvisionerMetrics>(ds, batchSize) {
- override fun createStatement(conn: Connection): PreparedStatement {
- return conn.prepareStatement("INSERT INTO provisioner_metrics (scenario_id, run_id, timestamp, host_total_count, host_available_count, vm_total_count, vm_active_count, vm_inactive_count, vm_waiting_count, vm_failed_count) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
- }
+public class PostgresProvisionerMetricsWriter(ds: DataSource, parallelism: Int, batchSize: Int) :
+ PostgresMetricsWriter<ProvisionerMetrics>(ds, parallelism, batchSize) {
+
+ override val table: SimpleRowWriter.Table = SimpleRowWriter.Table(
+ "provisioner_metrics",
+ *arrayOf(
+ "scenario_id",
+ "run_id",
+ "timestamp",
+ "host_total_count",
+ "host_available_count",
+ "vm_total_count",
+ "vm_active_count",
+ "vm_inactive_count",
+ "vm_waiting_count",
+ "vm_failed_count"
+ )
+ )
- override fun persist(action: Action.Write<ProvisionerMetrics>, stmt: PreparedStatement) {
- stmt.setLong(1, action.scenario)
- stmt.setInt(2, action.run)
- stmt.setTimestamp(3, Timestamp(action.metrics.time))
- stmt.setInt(4, action.metrics.totalHostCount)
- stmt.setInt(5, action.metrics.availableHostCount)
- stmt.setInt(6, action.metrics.totalVmCount)
- stmt.setInt(7, action.metrics.activeVmCount)
- stmt.setInt(8, action.metrics.inactiveVmCount)
- stmt.setInt(9, action.metrics.waitingVmCount)
- stmt.setInt(10, action.metrics.failedVmCount)
+ override fun persist(action: Action.Write<ProvisionerMetrics>, row: SimpleRow) {
+ row.setLong("scenario_id", action.scenario)
+ row.setInteger("run_id", action.run)
+ row.setLong("timestamp", action.metrics.time)
+ row.setInteger("host_total_count", action.metrics.totalHostCount)
+ row.setInteger("host_available_count", action.metrics.availableHostCount)
+ row.setInteger("vm_total_count", action.metrics.totalVmCount)
+ row.setInteger("vm_active_count", action.metrics.activeVmCount)
+ row.setInteger("vm_inactive_count", action.metrics.inactiveVmCount)
+ row.setInteger("vm_waiting_count", action.metrics.waitingVmCount)
+ row.setInteger("vm_failed_count", action.metrics.failedVmCount)
}
override fun toString(): String = "provisioner-writer"
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
index 7cc713bc..17b42f3d 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
@@ -83,7 +83,6 @@ class Sc20ParquetTraceReader(
}
.iterator()
-
override fun hasNext(): Boolean = iterator.hasNext()
override fun next(): TraceEntry<VmWorkload> = iterator.next()