summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-17 17:14:06 +0200
committerGitHub <noreply@github.com>2020-09-17 17:14:06 +0200
commitb717004a0c37e0f6282ec40a667c14d18e87b528 (patch)
tree5d628d5b35098282a20d2c748a8a05a6c4badf4c
parent27879cb9c05cd0900b8047c852a43dcd293ba172 (diff)
parent6b02680813d3bb00ba67d667df92682ab2592009 (diff)
Merge pull request #39 from atlarge-research/bug/error-reporting
Fix confusing behavior in Capelin experiment runner
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt16
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt6
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt23
3 files changed, 29 insertions, 16 deletions
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
index 4781f335..ec721ff0 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
@@ -53,7 +53,7 @@ private val logger = KotlinLogging.logger {}
/**
* Represents the command for running the experiment.
*/
-class ExperimentCli : CliktCommand(name = "sc20-experiment") {
+class ExperimentCli : CliktCommand(name = "sc20-experiment", help = "Run experiments from the Capelin paper") {
/**
* The path to the directory where the topology descriptions are located.
*/
@@ -72,14 +72,14 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
* The path to the performance interference model.
*/
private val performanceInterferenceStream by option("--performance-interference-model", help = "path to the performance interference file")
- .file()
+ .file(canBeDir = false)
.convert { it.inputStream() as InputStream }
/**
* The path to the original VM placements file.
*/
private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file")
- .file()
+ .file(canBeDir = false)
.convert {
Sc20VmPlacementReader(it.inputStream().buffered()).construct()
}
@@ -88,7 +88,7 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
/**
* The selected portfolios to run.
*/
- private val portfolios by option("--portfolio")
+ private val portfolios by option("--portfolio", help = "portfolio of scenarios to explore")
.choice(
"hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) }
as (Experiment, Int) -> Portfolio,
@@ -100,12 +100,12 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
"more-hpc" to { experiment, i -> MoreHpcPortfolio(experiment, i) },
ignoreCase = true
)
- .multiple()
+ .multiple(required = true)
/**
* The maximum number of worker threads to use.
*/
- private val parallelism by option("--parallelism")
+ private val parallelism by option("--parallelism", help = "maximum number of concurrent simulation runs")
.int()
.default(Runtime.getRuntime().availableProcessors())
@@ -142,10 +142,12 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
logger.info { "Starting experiment runner [parallelism=$parallelism]" }
val scheduler = ThreadPoolExperimentScheduler(parallelism)
val runner = DefaultExperimentRunner(scheduler)
+ val reporter = ConsoleExperimentReporter()
try {
- runner.execute(descriptor, ConsoleExperimentReporter())
+ runner.execute(descriptor, reporter)
} finally {
scheduler.close()
+ reporter.close()
}
}
}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
index b446abc8..4472def9 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
@@ -35,7 +35,7 @@ import mu.KotlinLogging
/**
* A reporter that reports the experiment progress to the console.
*/
-public class ConsoleExperimentReporter : ExperimentExecutionListener {
+public class ConsoleExperimentReporter : ExperimentExecutionListener, AutoCloseable {
/**
* The active [Run]s.
*/
@@ -82,4 +82,8 @@ public class ConsoleExperimentReporter : ExperimentExecutionListener {
}
override fun executionStarted(descriptor: ExperimentDescriptor) {}
+
+ override fun close() {
+ pb.close()
+ }
}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
index e42ac654..afa21f93 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
@@ -52,6 +52,16 @@ public open class ParquetEventWriter<in T : Event>(
private val bufferSize: Int = 4096
) : Runnable, Closeable {
/**
+ * The writer to write the Parquet file.
+ */
+ private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ /**
* The queue of commands to process.
*/
private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize)
@@ -59,7 +69,7 @@ public open class ParquetEventWriter<in T : Event>(
/**
* The thread that is responsible for writing the Parquet records.
*/
- private val writerThread = thread(start = true, name = "parquet-writer") { run() }
+ private val writerThread = thread(start = false, name = "parquet-writer") { run() }
/**
* Write the specified metrics to the database.
@@ -76,17 +86,14 @@ public open class ParquetEventWriter<in T : Event>(
writerThread.join()
}
+ init {
+ writerThread.start()
+ }
+
/**
* Start the writer thread.
*/
override fun run() {
- val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
-
try {
loop@ while (true) {
val action = queue.take()