summaryrefslogtreecommitdiff
path: root/simulator/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-17 16:28:41 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-17 16:39:41 +0200
commitb8c3b07804475c8ca4c7187f3690d9dfacaf43fe (patch)
tree6af4155f2fbbf4b4ec652859b6c4c448cd3ca485 /simulator/opendc
parent27879cb9c05cd0900b8047c852a43dcd293ba172 (diff)
Close ParquetEventWriter writer thread on failure
This change will close the writer thread of the ParquetEventWriter class when a failure occurs. Previously, this would result in a sleeping thread keeping the system from terminating.
Diffstat (limited to 'simulator/opendc')
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt4
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt6
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt23
3 files changed, 23 insertions, 10 deletions
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
index 4781f335..c5e4f90e 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
@@ -142,10 +142,12 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
logger.info { "Starting experiment runner [parallelism=$parallelism]" }
val scheduler = ThreadPoolExperimentScheduler(parallelism)
val runner = DefaultExperimentRunner(scheduler)
+ val reporter = ConsoleExperimentReporter()
try {
- runner.execute(descriptor, ConsoleExperimentReporter())
+ runner.execute(descriptor, reporter)
} finally {
scheduler.close()
+ reporter.close()
}
}
}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
index b446abc8..4472def9 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
@@ -35,7 +35,7 @@ import mu.KotlinLogging
/**
* A reporter that reports the experiment progress to the console.
*/
-public class ConsoleExperimentReporter : ExperimentExecutionListener {
+public class ConsoleExperimentReporter : ExperimentExecutionListener, AutoCloseable {
/**
* The active [Run]s.
*/
@@ -82,4 +82,8 @@ public class ConsoleExperimentReporter : ExperimentExecutionListener {
}
override fun executionStarted(descriptor: ExperimentDescriptor) {}
+
+ override fun close() {
+ pb.close()
+ }
}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
index e42ac654..afa21f93 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
@@ -52,6 +52,16 @@ public open class ParquetEventWriter<in T : Event>(
private val bufferSize: Int = 4096
) : Runnable, Closeable {
/**
+ * The writer to write the Parquet file.
+ */
+ private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ /**
* The queue of commands to process.
*/
private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize)
@@ -59,7 +69,7 @@ public open class ParquetEventWriter<in T : Event>(
/**
* The thread that is responsible for writing the Parquet records.
*/
- private val writerThread = thread(start = true, name = "parquet-writer") { run() }
+ private val writerThread = thread(start = false, name = "parquet-writer") { run() }
/**
* Write the specified metrics to the database.
@@ -76,17 +86,14 @@ public open class ParquetEventWriter<in T : Event>(
writerThread.join()
}
+ init {
+ writerThread.start()
+ }
+
/**
* Start the writer thread.
*/
override fun run() {
- val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
-
try {
loop@ while (true) {
val action = queue.take()