diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-14 14:44:15 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-14 14:44:15 +0200 |
| commit | 9a09573d1b039de999a7f225fa1b1021deb8f9b2 (patch) | |
| tree | b8985a3f356a6e215a8e36406ac359990882cd41 | |
| parent | 6c491d1f3e6c5c682cc250d1ffe3a501216b1929 (diff) | |
perf: Differentiate between host and provisioner writer parallelism
2 files changed, 54 insertions, 52 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt index 728fc62d..03995160 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt @@ -37,6 +37,7 @@ import me.tongfei.progressbar.ProgressBar import mu.KotlinLogging import java.io.Closeable import java.io.File +import java.util.concurrent.ExecutorCompletionService import java.util.concurrent.Executors import java.util.concurrent.Future import javax.sql.DataSource @@ -153,67 +154,59 @@ public class ExperimentRunner( val plan = createPlan() val total = plan.size - val executorService = Executors.newCachedThreadPool() - val planIterator = plan.iterator() - val futures = mutableListOf<Future<*>>() + val completionService = ExecutorCompletionService<Unit>(Executors.newCachedThreadPool()) val pb = ProgressBar("Experiment", total.toLong()) - while (planIterator.hasNext()) { - futures.clear() + var running = 0 - repeat(parallelism) { - if (!planIterator.hasNext()) { - return@repeat - } + for (run in plan) { + if (running >= parallelism) { + completionService.take() + running-- + } - val run = planIterator.next() - val scenarioId = scenarioIds[run.scenario]!! + val scenarioId = scenarioIds[run.scenario]!! - rawTraceReaders.computeIfAbsent(run.scenario.workload.name) { name -> - logger.info { "Loading trace $name" } - Sc20RawParquetTraceReader(File(tracePath, name)) - } + rawTraceReaders.computeIfAbsent(run.scenario.workload.name) { name -> + logger.info { "Loading trace $name" } + Sc20RawParquetTraceReader(File(tracePath, name)) + } - val future = executorService.submit { - pb.extraMessage = "($scenarioId, ${run.id}) START" + completionService.submit { + pb.extraMessage = "($scenarioId, ${run.id}) START" - var hasFailed = false - synchronized(helper) { - helper.startRun(scenarioId, run.id) - } + var hasFailed = false + synchronized(helper) { + helper.startRun(scenarioId, run.id) + } + + try { + val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id) + val traceReader = + createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run) + val environmentReader = createEnvironmentReader(run.scenario.topology.name) try { - val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id) - val traceReader = - createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run) - val environmentReader = createEnvironmentReader(run.scenario.topology.name) - - try { - run.scenario(run, reporter, environmentReader, traceReader) - } finally { - reporter.close() - } - - pb.extraMessage = "($scenarioId, ${run.id}) OK" - } catch (e: Throwable) { - logger.error("A run has failed", e) - hasFailed = true - pb.extraMessage = "($scenarioId, ${run.id}) FAIL" + run.scenario(run, reporter, environmentReader, traceReader) } finally { - synchronized(helper) { - helper.finishRun(scenarioId, run.id, hasFailed = hasFailed) - } + reporter.close() + } - pb.step() + pb.extraMessage = "($scenarioId, ${run.id}) OK" + } catch (e: Throwable) { + logger.error("A run has failed", e) + hasFailed = true + pb.extraMessage = "($scenarioId, ${run.id}) FAIL" + } finally { + synchronized(helper) { + helper.finishRun(scenarioId, run.id, hasFailed = hasFailed) } - } - futures += future + pb.step() + } } - for (future in futures) { - future.get() - } + running++ } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt index cf805286..0d60ce8c 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt @@ -125,13 +125,20 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { .default(Runtime.getRuntime().availableProcessors()) /** - * The maximum number of writer threads to use. + * The maximum number of host writer threads to use. */ - private val writerParallelism by option("--writer-parallelism") + private val hostWriterParallelism by option("--host-writer-parallelism") .int() .default(8) /** + * The maximum number of provisioner writer threads to use. + */ + private val provisionerWriterParallelism by option("--provisioner-writer-parallelism") + .int() + .default(1) + + /** * The buffer size for writing results. */ private val bufferSize by option("--buffer-size") @@ -145,7 +152,8 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { ds.addDataSourceProperty("reWriteBatchedInserts", "true") reporter.bufferSize = bufferSize - reporter.parallelism = writerParallelism + reporter.hostParallelism = hostWriterParallelism + reporter.provisionerParallelism = provisionerWriterParallelism val performanceInterferenceModel = performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it).construct() } @@ -166,7 +174,8 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { */ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentReporterProvider { var bufferSize = 4096 - var parallelism = 8 + var hostParallelism = 8 + var provisionerParallelism = 1 class Parquet : Reporter("Options for reporting using Parquet") { private val path by option("--parquet-directory", help = "path to where the output should be stored") @@ -184,8 +193,8 @@ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentRepo lateinit var provisionerWriter: PostgresProvisionerMetricsWriter override fun init(ds: DataSource) { - hostWriter = PostgresHostMetricsWriter(ds, parallelism, bufferSize) - provisionerWriter = PostgresProvisionerMetricsWriter(ds, parallelism, bufferSize) + hostWriter = PostgresHostMetricsWriter(ds, hostParallelism, bufferSize) + provisionerWriter = PostgresProvisionerMetricsWriter(ds, provisionerParallelism, bufferSize) } override fun createReporter(scenario: Long, run: Int): ExperimentReporter { |
