From 6c51f02c38053a8aa395ebeb5b29e2b0a4f30c84 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 14 May 2020 14:00:33 +0200 Subject: perf: Use PostgreSQL bulk data inserter --- .../virt/service/SimpleVirtProvisioningService.kt | 1 - .../compute/virt/service/VirtProvisioningEvent.kt | 3 - opendc/opendc-experiments-sc20/build.gradle.kts | 3 + .../opendc/experiments/sc20/ExperimentHelpers.kt | 1 - .../opendc/experiments/sc20/ExperimentRunner.kt | 45 ++++++------- .../opendc/experiments/sc20/ExperimentRunnerCli.kt | 47 +++++++------ .../opendc/experiments/sc20/WorkloadSampler.kt | 1 - .../sc20/reporter/PostgresExperimentReporter.kt | 2 - .../sc20/reporter/PostgresHostMetricsWriter.kt | 62 ++++++++++------- .../sc20/reporter/PostgresMetricsWriter.kt | 77 ++++++++++------------ .../reporter/PostgresProvisionerMetricsWriter.kt | 50 ++++++++------ .../sc20/trace/Sc20ParquetTraceReader.kt | 1 - 12 files changed, 154 insertions(+), 139 deletions(-) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index c25834a7..c3d9c745 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -257,7 +257,6 @@ class SimpleVirtProvisioningService( if (server in hypervisors) { // Corner case for when the hypervisor already exists availableHypervisors += hypervisors.getValue(server) - } else { val hv = HypervisorView( server.uid, diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt index 39f75913..c3fb99f9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt @@ -24,9 +24,6 @@ package com.atlarge.opendc.compute.virt.service -import com.atlarge.opendc.compute.virt.driver.VirtDriver - - /** * An event that is emitted by the [VirtProvisioningService]. */ diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index b7440792..2ba07554 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -47,6 +47,9 @@ dependencies { exclude(group = "log4j") } implementation("com.zaxxer:HikariCP:3.4.5") + implementation("de.bytefish.pgbulkinsert:pgbulkinsert-core:5.1.0") + implementation("de.bytefish.pgbulkinsert:pgbulkinsert-rowwriter:5.1.0") + implementation("me.tongfei:progressbar:0.8.1") runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") runtimeOnly("org.postgresql:postgresql:42.2.12") runtimeOnly(project(":odcsim:odcsim-engine-omega")) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt index 548400d6..2c41dd7b 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt @@ -192,7 +192,6 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex is VirtProvisioningEvent.MetricsAvailable -> reporter.reportProvisionerMetrics(clock.millis(), event) } - } .launchIn(domain) } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt index 7d65930c..728fc62d 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt @@ -33,14 +33,13 @@ import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.TraceReader +import me.tongfei.progressbar.ProgressBar import mu.KotlinLogging import java.io.Closeable import java.io.File import java.util.concurrent.Executors import java.util.concurrent.Future -import java.util.concurrent.atomic.AtomicInteger import javax.sql.DataSource -import kotlin.system.measureTimeMillis /** * The logger for the experiment runner. @@ -154,10 +153,10 @@ public class ExperimentRunner( val plan = createPlan() val total = plan.size - val finished = AtomicInteger() val executorService = Executors.newCachedThreadPool() val planIterator = plan.iterator() val futures = mutableListOf>() + val pb = ProgressBar("Experiment", total.toLong()) while (planIterator.hasNext()) { futures.clear() @@ -176,38 +175,36 @@ public class ExperimentRunner( } val future = executorService.submit { + pb.extraMessage = "($scenarioId, ${run.id}) START" + + var hasFailed = false synchronized(helper) { helper.startRun(scenarioId, run.id) } - logger.info { "[${finished.get()}/$total] Starting run ($scenarioId, ${run.id})" } - try { - val duration = measureTimeMillis { - val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id) - val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run) - val environmentReader = createEnvironmentReader(run.scenario.topology.name) - - try { - run.scenario(run, reporter, environmentReader, traceReader) - logger.info { "Done" } - } finally { - reporter.close() - } + val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id) + val traceReader = + createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run) + val environmentReader = createEnvironmentReader(run.scenario.topology.name) + + try { + run.scenario(run, reporter, environmentReader, traceReader) + } finally { + reporter.close() } - finished.incrementAndGet() - logger.info { "[${finished.get()}/$total] Finished run ($scenarioId, ${run.id}) in $duration milliseconds" } - - synchronized(helper) { - helper.finishRun(scenarioId, run.id, hasFailed = false) - } + pb.extraMessage = "($scenarioId, ${run.id}) OK" } catch (e: Throwable) { logger.error("A run has failed", e) - finished.incrementAndGet() + hasFailed = true + pb.extraMessage = "($scenarioId, ${run.id}) FAIL" + } finally { synchronized(helper) { - helper.finishRun(scenarioId, run.id, hasFailed = true) + helper.finishRun(scenarioId, run.id, hasFailed = hasFailed) } + + pb.step() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt index 9cbfcdc1..cf805286 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt @@ -118,16 +118,23 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { .multiple() /** - * The maximum number of threads to use. + * The maximum number of worker threads to use. */ - private val parallelism by option("--parallelism") + private val workerParallelism by option("--worker-parallelism") .int() .default(Runtime.getRuntime().availableProcessors()) /** - * The batch size for writing results. + * The maximum number of writer threads to use. */ - private val batchSize by option("--batch-size") + private val writerParallelism by option("--writer-parallelism") + .int() + .default(8) + + /** + * The buffer size for writing results. + */ + private val bufferSize by option("--buffer-size") .int() .default(4096) @@ -137,13 +144,13 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { ds.jdbcUrl = jdbcUrl ds.addDataSourceProperty("reWriteBatchedInserts", "true") - - reporter.batchSize = batchSize + reporter.bufferSize = bufferSize + reporter.parallelism = writerParallelism val performanceInterferenceModel = - performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it).construct() } + performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it).construct() } - val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel, parallelism) + val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel, workerParallelism) try { runner.run() @@ -158,7 +165,8 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { * An option for specifying the type of reporter to use. */ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentReporterProvider { - var batchSize = 4096 + var bufferSize = 4096 + var parallelism = 8 class Parquet : Reporter("Options for reporting using Parquet") { private val path by option("--parquet-directory", help = "path to where the output should be stored") @@ -172,25 +180,22 @@ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentRepo } class Postgres : Reporter("Options for reporting using PostgreSQL") { - lateinit var ds: DataSource + lateinit var hostWriter: PostgresHostMetricsWriter + lateinit var provisionerWriter: PostgresProvisionerMetricsWriter override fun init(ds: DataSource) { - this.ds = ds + hostWriter = PostgresHostMetricsWriter(ds, parallelism, bufferSize) + provisionerWriter = PostgresProvisionerMetricsWriter(ds, parallelism, bufferSize) } override fun createReporter(scenario: Long, run: Int): ExperimentReporter { - val hostWriter = PostgresHostMetricsWriter(ds, batchSize) - val provisionerWriter = PostgresProvisionerMetricsWriter(ds, batchSize) - val delegate = ExperimentPostgresReporter(scenario, run, hostWriter, provisionerWriter) - return object : ExperimentReporter by delegate { - override fun close() { - hostWriter.close() - provisionerWriter.close() - } - } + return ExperimentPostgresReporter(scenario, run, hostWriter, provisionerWriter) } - override fun close() {} + override fun close() { + hostWriter.close() + provisionerWriter.close() + } } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt index bb3466ba..99634e1b 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt @@ -64,6 +64,5 @@ fun sampleRegularWorkload(trace: List>, run: Run): List() override fun reportPowerConsumption(host: Server, draw: Double) { lastPowerConsumption[host] = draw } - override fun reportHostSlice( time: Long, requestedBurst: Long, diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt index 5eb55f20..43e4a7a6 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt @@ -24,35 +24,51 @@ package com.atlarge.opendc.experiments.sc20.reporter -import java.sql.Connection -import java.sql.PreparedStatement -import java.sql.Timestamp +import de.bytefish.pgbulkinsert.row.SimpleRow +import de.bytefish.pgbulkinsert.row.SimpleRowWriter import javax.sql.DataSource /** * A [PostgresMetricsWriter] for persisting [HostMetrics]. */ -public class PostgresHostMetricsWriter(ds: DataSource, batchSize: Int) : - PostgresMetricsWriter(ds, batchSize) { - override fun createStatement(conn: Connection): PreparedStatement { - return conn.prepareStatement("INSERT INTO host_metrics (scenario_id, run_id, host_id, state, timestamp, duration, vm_count, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, power_draw) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") - } +public class PostgresHostMetricsWriter(ds: DataSource, parallelism: Int, batchSize: Int) : + PostgresMetricsWriter(ds, parallelism, batchSize) { + + override val table: SimpleRowWriter.Table = SimpleRowWriter.Table( + "host_metrics", + *arrayOf( + "scenario_id", + "run_id", + "host_id", + "state", + "timestamp", + "duration", + "vm_count", + "requested_burst", + "granted_burst", + "overcommissioned_burst", + "interfered_burst", + "cpu_usage", + "cpu_demand", + "power_draw" + ) + ) - override fun persist(action: Action.Write, stmt: PreparedStatement) { - stmt.setLong(1, action.scenario) - stmt.setInt(2, action.run) - stmt.setString(3, action.metrics.host.name) - stmt.setString(4, action.metrics.host.state.name) - stmt.setTimestamp(5, Timestamp(action.metrics.time)) - stmt.setLong(6, action.metrics.duration) - stmt.setInt(7, action.metrics.vmCount) - stmt.setLong(8, action.metrics.requestedBurst) - stmt.setLong(9, action.metrics.grantedBurst) - stmt.setLong(10, action.metrics.overcommissionedBurst) - stmt.setLong(11, action.metrics.interferedBurst) - stmt.setDouble(12, action.metrics.cpuUsage) - stmt.setDouble(13, action.metrics.cpuDemand) - stmt.setDouble(14, action.metrics.powerDraw) + override fun persist(action: Action.Write, row: SimpleRow) { + row.setLong("scenario_id", action.scenario) + row.setInteger("run_id", action.run) + row.setText("host_id", action.metrics.host.name) + row.setText("state", action.metrics.host.state.name) + row.setLong("timestamp", action.metrics.time) + row.setLong("duration", action.metrics.duration) + row.setInteger("vm_count", action.metrics.vmCount) + row.setLong("requested_burst", action.metrics.requestedBurst) + row.setLong("granted_burst", action.metrics.grantedBurst) + row.setLong("overcommissioned_burst", action.metrics.overcommissionedBurst) + row.setLong("interfered_burst", action.metrics.interferedBurst) + row.setDouble("cpu_usage", action.metrics.cpuUsage) + row.setDouble("cpu_demand", action.metrics.cpuDemand) + row.setDouble("power_draw", action.metrics.powerDraw) } override fun toString(): String = "host-writer" diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt index 33c2d40e..715800a3 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt @@ -24,13 +24,15 @@ package com.atlarge.opendc.experiments.sc20.reporter +import de.bytefish.pgbulkinsert.row.SimpleRow +import de.bytefish.pgbulkinsert.row.SimpleRowWriter +import org.postgresql.PGConnection import java.io.Closeable -import java.sql.Connection -import java.sql.PreparedStatement import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.BlockingQueue +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit import javax.sql.DataSource -import kotlin.concurrent.thread /** * The experiment writer is a separate thread that is responsible for writing the results to the @@ -38,17 +40,18 @@ import kotlin.concurrent.thread */ public abstract class PostgresMetricsWriter( private val ds: DataSource, - private val batchSize: Int = 4096 + private val parallelism: Int = 8, + private val bufferSize: Int = 4096 ) : Runnable, Closeable { /** * The queue of commands to process. */ - private val queue: BlockingQueue = ArrayBlockingQueue(4 * batchSize) + private val queue: BlockingQueue = ArrayBlockingQueue(parallelism * bufferSize) /** - * The thread for the actual writer. + * The executor service to use. */ - private val writerThread: Thread = thread(name = "metrics-writer") { run() } + private val executorService = Executors.newFixedThreadPool(parallelism) /** * Write the specified metrics to the database. @@ -61,64 +64,52 @@ public abstract class PostgresMetricsWriter( * Signal the writer to stop. */ public override fun close() { - queue.put(Action.Stop) - writerThread.join() + repeat(parallelism) { + queue.put(Action.Stop) + } + executorService.shutdown() + executorService.awaitTermination(5, TimeUnit.MINUTES) } /** - * Create a prepared statement to use. + * Create the table to which we write. */ - public abstract fun createStatement(conn: Connection): PreparedStatement + public abstract val table: SimpleRowWriter.Table /** - * Persist the specified metrics using the given [stmt]. + * Persist the specified metrics to the given [row]. */ - public abstract fun persist(action: Action.Write, stmt: PreparedStatement) + public abstract fun persist(action: Action.Write, row: SimpleRow) + + init { + repeat(parallelism) { + executorService.submit { run() } + } + } /** * Start the writer thread. */ override fun run() { - writerThread.name = toString() val conn = ds.connection - var batch = 0 - conn.autoCommit = false - val stmt = createStatement(conn) + val writer = SimpleRowWriter(table) + writer.open(conn.unwrap(PGConnection::class.java)) try { - val actions = mutableListOf() loop@ while (true) { - actions.clear() - - if (queue.isEmpty()) { - actions.add(queue.take()) - } - queue.drainTo(actions) - - for (action in actions) { - when (action) { - is Action.Stop -> break@loop - is Action.Write<*> -> { - @Suppress("UNCHECKED_CAST") - persist(action as Action.Write, stmt) - stmt.addBatch() - batch++ - - if (batch % batchSize == 0) { - stmt.executeBatch() - conn.commit() - } - - } + val action = queue.take() + when (action) { + is Action.Stop -> break@loop + is Action.Write<*> -> writer.startRow { + @Suppress("UNCHECKED_CAST") + persist(action as Action.Write, it) } } } - } finally { - stmt.executeBatch() - conn.commit() + writer.close() conn.close() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt index 7bc88959..a7a86206 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt @@ -24,31 +24,43 @@ package com.atlarge.opendc.experiments.sc20.reporter -import java.sql.Connection -import java.sql.PreparedStatement -import java.sql.Timestamp +import de.bytefish.pgbulkinsert.row.SimpleRow +import de.bytefish.pgbulkinsert.row.SimpleRowWriter import javax.sql.DataSource /** * A [PostgresMetricsWriter] for persisting [ProvisionerMetrics]. */ -public class PostgresProvisionerMetricsWriter(ds: DataSource, batchSize: Int) : - PostgresMetricsWriter(ds, batchSize) { - override fun createStatement(conn: Connection): PreparedStatement { - return conn.prepareStatement("INSERT INTO provisioner_metrics (scenario_id, run_id, timestamp, host_total_count, host_available_count, vm_total_count, vm_active_count, vm_inactive_count, vm_waiting_count, vm_failed_count) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") - } +public class PostgresProvisionerMetricsWriter(ds: DataSource, parallelism: Int, batchSize: Int) : + PostgresMetricsWriter(ds, parallelism, batchSize) { + + override val table: SimpleRowWriter.Table = SimpleRowWriter.Table( + "provisioner_metrics", + *arrayOf( + "scenario_id", + "run_id", + "timestamp", + "host_total_count", + "host_available_count", + "vm_total_count", + "vm_active_count", + "vm_inactive_count", + "vm_waiting_count", + "vm_failed_count" + ) + ) - override fun persist(action: Action.Write, stmt: PreparedStatement) { - stmt.setLong(1, action.scenario) - stmt.setInt(2, action.run) - stmt.setTimestamp(3, Timestamp(action.metrics.time)) - stmt.setInt(4, action.metrics.totalHostCount) - stmt.setInt(5, action.metrics.availableHostCount) - stmt.setInt(6, action.metrics.totalVmCount) - stmt.setInt(7, action.metrics.activeVmCount) - stmt.setInt(8, action.metrics.inactiveVmCount) - stmt.setInt(9, action.metrics.waitingVmCount) - stmt.setInt(10, action.metrics.failedVmCount) + override fun persist(action: Action.Write, row: SimpleRow) { + row.setLong("scenario_id", action.scenario) + row.setInteger("run_id", action.run) + row.setLong("timestamp", action.metrics.time) + row.setInteger("host_total_count", action.metrics.totalHostCount) + row.setInteger("host_available_count", action.metrics.availableHostCount) + row.setInteger("vm_total_count", action.metrics.totalVmCount) + row.setInteger("vm_active_count", action.metrics.activeVmCount) + row.setInteger("vm_inactive_count", action.metrics.inactiveVmCount) + row.setInteger("vm_waiting_count", action.metrics.waitingVmCount) + row.setInteger("vm_failed_count", action.metrics.failedVmCount) } override fun toString(): String = "provisioner-writer" diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt index 7cc713bc..17b42f3d 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt @@ -83,7 +83,6 @@ class Sc20ParquetTraceReader( } .iterator() - override fun hasNext(): Boolean = iterator.hasNext() override fun next(): TraceEntry = iterator.next() -- cgit v1.2.3