summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-14 14:44:15 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-14 14:44:15 +0200
commit9a09573d1b039de999a7f225fa1b1021deb8f9b2 (patch)
treeb8985a3f356a6e215a8e36406ac359990882cd41 /opendc
parent6c491d1f3e6c5c682cc250d1ffe3a501216b1929 (diff)
perf: Differentiate between host and provisioner writer parallelism
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt85
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt21
2 files changed, 54 insertions, 52 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
index 728fc62d..03995160 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
@@ -37,6 +37,7 @@ import me.tongfei.progressbar.ProgressBar
import mu.KotlinLogging
import java.io.Closeable
import java.io.File
+import java.util.concurrent.ExecutorCompletionService
import java.util.concurrent.Executors
import java.util.concurrent.Future
import javax.sql.DataSource
@@ -153,67 +154,59 @@ public class ExperimentRunner(
val plan = createPlan()
val total = plan.size
- val executorService = Executors.newCachedThreadPool()
- val planIterator = plan.iterator()
- val futures = mutableListOf<Future<*>>()
+ val completionService = ExecutorCompletionService<Unit>(Executors.newCachedThreadPool())
val pb = ProgressBar("Experiment", total.toLong())
- while (planIterator.hasNext()) {
- futures.clear()
+ var running = 0
- repeat(parallelism) {
- if (!planIterator.hasNext()) {
- return@repeat
- }
+ for (run in plan) {
+ if (running >= parallelism) {
+ completionService.take()
+ running--
+ }
- val run = planIterator.next()
- val scenarioId = scenarioIds[run.scenario]!!
+ val scenarioId = scenarioIds[run.scenario]!!
- rawTraceReaders.computeIfAbsent(run.scenario.workload.name) { name ->
- logger.info { "Loading trace $name" }
- Sc20RawParquetTraceReader(File(tracePath, name))
- }
+ rawTraceReaders.computeIfAbsent(run.scenario.workload.name) { name ->
+ logger.info { "Loading trace $name" }
+ Sc20RawParquetTraceReader(File(tracePath, name))
+ }
- val future = executorService.submit {
- pb.extraMessage = "($scenarioId, ${run.id}) START"
+ completionService.submit {
+ pb.extraMessage = "($scenarioId, ${run.id}) START"
- var hasFailed = false
- synchronized(helper) {
- helper.startRun(scenarioId, run.id)
- }
+ var hasFailed = false
+ synchronized(helper) {
+ helper.startRun(scenarioId, run.id)
+ }
+
+ try {
+ val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id)
+ val traceReader =
+ createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run)
+ val environmentReader = createEnvironmentReader(run.scenario.topology.name)
try {
- val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id)
- val traceReader =
- createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run)
- val environmentReader = createEnvironmentReader(run.scenario.topology.name)
-
- try {
- run.scenario(run, reporter, environmentReader, traceReader)
- } finally {
- reporter.close()
- }
-
- pb.extraMessage = "($scenarioId, ${run.id}) OK"
- } catch (e: Throwable) {
- logger.error("A run has failed", e)
- hasFailed = true
- pb.extraMessage = "($scenarioId, ${run.id}) FAIL"
+ run.scenario(run, reporter, environmentReader, traceReader)
} finally {
- synchronized(helper) {
- helper.finishRun(scenarioId, run.id, hasFailed = hasFailed)
- }
+ reporter.close()
+ }
- pb.step()
+ pb.extraMessage = "($scenarioId, ${run.id}) OK"
+ } catch (e: Throwable) {
+ logger.error("A run has failed", e)
+ hasFailed = true
+ pb.extraMessage = "($scenarioId, ${run.id}) FAIL"
+ } finally {
+ synchronized(helper) {
+ helper.finishRun(scenarioId, run.id, hasFailed = hasFailed)
}
- }
- futures += future
+ pb.step()
+ }
}
- for (future in futures) {
- future.get()
- }
+ running++
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
index cf805286..0d60ce8c 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
@@ -125,13 +125,20 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
.default(Runtime.getRuntime().availableProcessors())
/**
- * The maximum number of writer threads to use.
+ * The maximum number of host writer threads to use.
*/
- private val writerParallelism by option("--writer-parallelism")
+ private val hostWriterParallelism by option("--host-writer-parallelism")
.int()
.default(8)
/**
+ * The maximum number of provisioner writer threads to use.
+ */
+ private val provisionerWriterParallelism by option("--provisioner-writer-parallelism")
+ .int()
+ .default(1)
+
+ /**
* The buffer size for writing results.
*/
private val bufferSize by option("--buffer-size")
@@ -145,7 +152,8 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
ds.addDataSourceProperty("reWriteBatchedInserts", "true")
reporter.bufferSize = bufferSize
- reporter.parallelism = writerParallelism
+ reporter.hostParallelism = hostWriterParallelism
+ reporter.provisionerParallelism = provisionerWriterParallelism
val performanceInterferenceModel =
performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it).construct() }
@@ -166,7 +174,8 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
*/
internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentReporterProvider {
var bufferSize = 4096
- var parallelism = 8
+ var hostParallelism = 8
+ var provisionerParallelism = 1
class Parquet : Reporter("Options for reporting using Parquet") {
private val path by option("--parquet-directory", help = "path to where the output should be stored")
@@ -184,8 +193,8 @@ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentRepo
lateinit var provisionerWriter: PostgresProvisionerMetricsWriter
override fun init(ds: DataSource) {
- hostWriter = PostgresHostMetricsWriter(ds, parallelism, bufferSize)
- provisionerWriter = PostgresProvisionerMetricsWriter(ds, parallelism, bufferSize)
+ hostWriter = PostgresHostMetricsWriter(ds, hostParallelism, bufferSize)
+ provisionerWriter = PostgresProvisionerMetricsWriter(ds, provisionerParallelism, bufferSize)
}
override fun createReporter(scenario: Long, run: Int): ExperimentReporter {