From b8c3b07804475c8ca4c7187f3690d9dfacaf43fe Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 17 Sep 2020 16:28:41 +0200 Subject: Close ParquetEventWriter writer thread on failure This change will close the writer thread of the ParquetEventWriter class when a failure occurs. Previously, this would result in a sleeping thread keeping the system from terminating. --- .../com/atlarge/opendc/experiments/sc20/Main.kt | 4 +++- .../sc20/reporter/ConsoleExperimentReporter.kt | 6 +++++- .../sc20/telemetry/parquet/ParquetEventWriter.kt | 23 ++++++++++++++-------- 3 files changed, 23 insertions(+), 10 deletions(-) (limited to 'simulator') diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt index 4781f335..c5e4f90e 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -142,10 +142,12 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { logger.info { "Starting experiment runner [parallelism=$parallelism]" } val scheduler = ThreadPoolExperimentScheduler(parallelism) val runner = DefaultExperimentRunner(scheduler) + val reporter = ConsoleExperimentReporter() try { - runner.execute(descriptor, ConsoleExperimentReporter()) + runner.execute(descriptor, reporter) } finally { scheduler.close() + reporter.close() } } } diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt index b446abc8..4472def9 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt @@ -35,7 +35,7 @@ import mu.KotlinLogging /** * A reporter that reports the experiment progress to the console. */ -public class ConsoleExperimentReporter : ExperimentExecutionListener { +public class ConsoleExperimentReporter : ExperimentExecutionListener, AutoCloseable { /** * The active [Run]s. */ @@ -82,4 +82,8 @@ public class ConsoleExperimentReporter : ExperimentExecutionListener { } override fun executionStarted(descriptor: ExperimentDescriptor) {} + + override fun close() { + pb.close() + } } diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt index e42ac654..afa21f93 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt @@ -51,6 +51,16 @@ public open class ParquetEventWriter( private val converter: (T, GenericData.Record) -> Unit, private val bufferSize: Int = 4096 ) : Runnable, Closeable { + /** + * The writer to write the Parquet file. + */ + private val writer = AvroParquetWriter.builder(Path(path.absolutePath)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + /** * The queue of commands to process. */ @@ -59,7 +69,7 @@ public open class ParquetEventWriter( /** * The thread that is responsible for writing the Parquet records. */ - private val writerThread = thread(start = true, name = "parquet-writer") { run() } + private val writerThread = thread(start = false, name = "parquet-writer") { run() } /** * Write the specified metrics to the database. @@ -76,17 +86,14 @@ public open class ParquetEventWriter( writerThread.join() } + init { + writerThread.start() + } + /** * Start the writer thread. */ override fun run() { - val writer = AvroParquetWriter.builder(Path(path.absolutePath)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - try { loop@ while (true) { val action = queue.take() -- cgit v1.2.3 From 6b02680813d3bb00ba67d667df92682ab2592009 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 17 Sep 2020 16:49:16 +0200 Subject: Require specification of single portfolio This change adds the requirement for the command line interface to specify at least a single portfolio to run. Previously, the experiment runner would start and terminate silently without reporting that no portfolios were run. This behavior is confusing especially for users not familiar with the portfolios concept. --- .../main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'simulator') diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt index c5e4f90e..ec721ff0 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -53,7 +53,7 @@ private val logger = KotlinLogging.logger {} /** * Represents the command for running the experiment. */ -class ExperimentCli : CliktCommand(name = "sc20-experiment") { +class ExperimentCli : CliktCommand(name = "sc20-experiment", help = "Run experiments from the Capelin paper") { /** * The path to the directory where the topology descriptions are located. */ @@ -72,14 +72,14 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { * The path to the performance interference model. */ private val performanceInterferenceStream by option("--performance-interference-model", help = "path to the performance interference file") - .file() + .file(canBeDir = false) .convert { it.inputStream() as InputStream } /** * The path to the original VM placements file. */ private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file") - .file() + .file(canBeDir = false) .convert { Sc20VmPlacementReader(it.inputStream().buffered()).construct() } @@ -88,7 +88,7 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { /** * The selected portfolios to run. */ - private val portfolios by option("--portfolio") + private val portfolios by option("--portfolio", help = "portfolio of scenarios to explore") .choice( "hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) } as (Experiment, Int) -> Portfolio, @@ -100,12 +100,12 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { "more-hpc" to { experiment, i -> MoreHpcPortfolio(experiment, i) }, ignoreCase = true ) - .multiple() + .multiple(required = true) /** * The maximum number of worker threads to use. */ - private val parallelism by option("--parallelism") + private val parallelism by option("--parallelism", help = "maximum number of concurrent simulation runs") .int() .default(Runtime.getRuntime().availableProcessors()) -- cgit v1.2.3