From b8c3b07804475c8ca4c7187f3690d9dfacaf43fe Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 17 Sep 2020 16:28:41 +0200 Subject: Close ParquetEventWriter writer thread on failure This change will close the writer thread of the ParquetEventWriter class when a failure occurs. Previously, this would result in a sleeping thread keeping the system from terminating. --- .../com/atlarge/opendc/experiments/sc20/Main.kt | 4 +++- .../sc20/reporter/ConsoleExperimentReporter.kt | 6 +++++- .../sc20/telemetry/parquet/ParquetEventWriter.kt | 23 ++++++++++++++-------- 3 files changed, 23 insertions(+), 10 deletions(-) (limited to 'simulator/opendc/opendc-experiments-sc20/src') diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt index 4781f335..c5e4f90e 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -142,10 +142,12 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { logger.info { "Starting experiment runner [parallelism=$parallelism]" } val scheduler = ThreadPoolExperimentScheduler(parallelism) val runner = DefaultExperimentRunner(scheduler) + val reporter = ConsoleExperimentReporter() try { - runner.execute(descriptor, ConsoleExperimentReporter()) + runner.execute(descriptor, reporter) } finally { scheduler.close() + reporter.close() } } } diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt index b446abc8..4472def9 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt @@ -35,7 +35,7 @@ import mu.KotlinLogging /** * A reporter that reports the experiment progress to the console. */ -public class ConsoleExperimentReporter : ExperimentExecutionListener { +public class ConsoleExperimentReporter : ExperimentExecutionListener, AutoCloseable { /** * The active [Run]s. */ @@ -82,4 +82,8 @@ public class ConsoleExperimentReporter : ExperimentExecutionListener { } override fun executionStarted(descriptor: ExperimentDescriptor) {} + + override fun close() { + pb.close() + } } diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt index e42ac654..afa21f93 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt @@ -51,6 +51,16 @@ public open class ParquetEventWriter( private val converter: (T, GenericData.Record) -> Unit, private val bufferSize: Int = 4096 ) : Runnable, Closeable { + /** + * The writer to write the Parquet file. + */ + private val writer = AvroParquetWriter.builder(Path(path.absolutePath)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + /** * The queue of commands to process. */ @@ -59,7 +69,7 @@ public open class ParquetEventWriter( /** * The thread that is responsible for writing the Parquet records. */ - private val writerThread = thread(start = true, name = "parquet-writer") { run() } + private val writerThread = thread(start = false, name = "parquet-writer") { run() } /** * Write the specified metrics to the database. @@ -76,17 +86,14 @@ public open class ParquetEventWriter( writerThread.join() } + init { + writerThread.start() + } + /** * Start the writer thread. */ override fun run() { - val writer = AvroParquetWriter.builder(Path(path.absolutePath)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - try { loop@ while (true) { val action = queue.take() -- cgit v1.2.3