diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-09-17 17:14:06 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-09-17 17:14:06 +0200 |
| commit | b717004a0c37e0f6282ec40a667c14d18e87b528 (patch) | |
| tree | 5d628d5b35098282a20d2c748a8a05a6c4badf4c | |
| parent | 27879cb9c05cd0900b8047c852a43dcd293ba172 (diff) | |
| parent | 6b02680813d3bb00ba67d667df92682ab2592009 (diff) | |
Merge pull request #39 from atlarge-research/bug/error-reporting
Fix confusing behavior in Capelin experiment runner
3 files changed, 29 insertions, 16 deletions
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt index 4781f335..ec721ff0 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -53,7 +53,7 @@ private val logger = KotlinLogging.logger {} /** * Represents the command for running the experiment. */ -class ExperimentCli : CliktCommand(name = "sc20-experiment") { +class ExperimentCli : CliktCommand(name = "sc20-experiment", help = "Run experiments from the Capelin paper") { /** * The path to the directory where the topology descriptions are located. */ @@ -72,14 +72,14 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { * The path to the performance interference model. */ private val performanceInterferenceStream by option("--performance-interference-model", help = "path to the performance interference file") - .file() + .file(canBeDir = false) .convert { it.inputStream() as InputStream } /** * The path to the original VM placements file. */ private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file") - .file() + .file(canBeDir = false) .convert { Sc20VmPlacementReader(it.inputStream().buffered()).construct() } @@ -88,7 +88,7 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { /** * The selected portfolios to run. */ - private val portfolios by option("--portfolio") + private val portfolios by option("--portfolio", help = "portfolio of scenarios to explore") .choice( "hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) } as (Experiment, Int) -> Portfolio, @@ -100,12 +100,12 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { "more-hpc" to { experiment, i -> MoreHpcPortfolio(experiment, i) }, ignoreCase = true ) - .multiple() + .multiple(required = true) /** * The maximum number of worker threads to use. */ - private val parallelism by option("--parallelism") + private val parallelism by option("--parallelism", help = "maximum number of concurrent simulation runs") .int() .default(Runtime.getRuntime().availableProcessors()) @@ -142,10 +142,12 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { logger.info { "Starting experiment runner [parallelism=$parallelism]" } val scheduler = ThreadPoolExperimentScheduler(parallelism) val runner = DefaultExperimentRunner(scheduler) + val reporter = ConsoleExperimentReporter() try { - runner.execute(descriptor, ConsoleExperimentReporter()) + runner.execute(descriptor, reporter) } finally { scheduler.close() + reporter.close() } } } diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt index b446abc8..4472def9 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt @@ -35,7 +35,7 @@ import mu.KotlinLogging /** * A reporter that reports the experiment progress to the console. */ -public class ConsoleExperimentReporter : ExperimentExecutionListener { +public class ConsoleExperimentReporter : ExperimentExecutionListener, AutoCloseable { /** * The active [Run]s. */ @@ -82,4 +82,8 @@ public class ConsoleExperimentReporter : ExperimentExecutionListener { } override fun executionStarted(descriptor: ExperimentDescriptor) {} + + override fun close() { + pb.close() + } } diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt index e42ac654..afa21f93 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt @@ -52,6 +52,16 @@ public open class ParquetEventWriter<in T : Event>( private val bufferSize: Int = 4096 ) : Runnable, Closeable { /** + * The writer to write the Parquet file. + */ + private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + /** * The queue of commands to process. */ private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize) @@ -59,7 +69,7 @@ public open class ParquetEventWriter<in T : Event>( /** * The thread that is responsible for writing the Parquet records. */ - private val writerThread = thread(start = true, name = "parquet-writer") { run() } + private val writerThread = thread(start = false, name = "parquet-writer") { run() } /** * Write the specified metrics to the database. @@ -76,17 +86,14 @@ public open class ParquetEventWriter<in T : Event>( writerThread.join() } + init { + writerThread.start() + } + /** * Start the writer thread. */ override fun run() { - val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - try { loop@ while (true) { val action = queue.take() |
