diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-09-17 16:28:41 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-09-17 16:39:41 +0200 |
| commit | b8c3b07804475c8ca4c7187f3690d9dfacaf43fe (patch) | |
| tree | 6af4155f2fbbf4b4ec652859b6c4c448cd3ca485 /simulator/opendc | |
| parent | 27879cb9c05cd0900b8047c852a43dcd293ba172 (diff) | |
Close ParquetEventWriter writer thread on failure
This change will close the writer thread of the ParquetEventWriter class
when a failure occurs. Previously, this would result in a sleeping
thread keeping the system from terminating.
Diffstat (limited to 'simulator/opendc')
3 files changed, 23 insertions, 10 deletions
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt index 4781f335..c5e4f90e 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -142,10 +142,12 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { logger.info { "Starting experiment runner [parallelism=$parallelism]" } val scheduler = ThreadPoolExperimentScheduler(parallelism) val runner = DefaultExperimentRunner(scheduler) + val reporter = ConsoleExperimentReporter() try { - runner.execute(descriptor, ConsoleExperimentReporter()) + runner.execute(descriptor, reporter) } finally { scheduler.close() + reporter.close() } } } diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt index b446abc8..4472def9 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt @@ -35,7 +35,7 @@ import mu.KotlinLogging /** * A reporter that reports the experiment progress to the console. */ -public class ConsoleExperimentReporter : ExperimentExecutionListener { +public class ConsoleExperimentReporter : ExperimentExecutionListener, AutoCloseable { /** * The active [Run]s. */ @@ -82,4 +82,8 @@ public class ConsoleExperimentReporter : ExperimentExecutionListener { } override fun executionStarted(descriptor: ExperimentDescriptor) {} + + override fun close() { + pb.close() + } } diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt index e42ac654..afa21f93 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt @@ -52,6 +52,16 @@ public open class ParquetEventWriter<in T : Event>( private val bufferSize: Int = 4096 ) : Runnable, Closeable { /** + * The writer to write the Parquet file. + */ + private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + /** * The queue of commands to process. */ private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize) @@ -59,7 +69,7 @@ public open class ParquetEventWriter<in T : Event>( /** * The thread that is responsible for writing the Parquet records. */ - private val writerThread = thread(start = true, name = "parquet-writer") { run() } + private val writerThread = thread(start = false, name = "parquet-writer") { run() } /** * Write the specified metrics to the database. @@ -76,17 +86,14 @@ public open class ParquetEventWriter<in T : Event>( writerThread.join() } + init { + writerThread.start() + } + /** * Start the writer thread. */ override fun run() { - val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - try { loop@ while (true) { val action = queue.take() |
