diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-04-16 15:30:34 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-04-16 15:30:34 +0200 |
| commit | 6cd93b57945b289b2e14556f7ceaa193326eff78 (patch) | |
| tree | 9e2980e74a898a01cb9cd4a37fe0d37ca8d98f9f /opendc/opendc-experiments-sc20/src/main | |
| parent | e097aeb16d77c260126b65c7f13330076d800d52 (diff) | |
bug: Fix issues related to early termination
Diffstat (limited to 'opendc/opendc-experiments-sc20/src/main')
4 files changed, 122 insertions, 56 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 7c198e56..bac0de21 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -12,6 +12,8 @@ import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.io.Closeable +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread class Sc20Monitor( destination: String @@ -45,6 +47,19 @@ class Sc20Monitor( .withPageSize(4 * 1024 * 1024) // For compression .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) .build() + private val queue = ArrayBlockingQueue<GenericData.Record>(2048) + private val writerThread = thread(start = true, name = "sc20-writer") { + try { + while (true) { + val record = queue.take() + writer.write(record) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + writer.close() + } + } suspend fun onVmStateChanged(server: Server) {} @@ -122,10 +137,14 @@ class Sc20Monitor( record.put("totalRunningVms", runningVms) record.put("totalFinishedVms", finishedVms) - writer.write(record) + queue.put(record) } override fun close() { - writer.close() + // Busy loop to wait for writer thread to finish + while (queue.isNotEmpty()) { + Thread.sleep(500) + } + writerThread.interrupt() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt index 30456204..24ff9eed 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt @@ -39,7 +39,6 @@ import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate -import org.apache.parquet.hadoop.ParquetReader import org.apache.parquet.io.api.Binary import java.io.File import java.io.Serializable @@ -47,7 +46,9 @@ import java.util.Deque import java.util.SortedSet import java.util.TreeSet import java.util.UUID +import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.LinkedBlockingDeque +import kotlin.concurrent.thread import kotlin.random.Random /** @@ -69,37 +70,82 @@ class Sc20ParquetTraceReader( private val iterator: Iterator<TraceEntry<VmWorkload>> /** - * Fill the buffers of the VMs + * The intermediate buffer to store the read records in. */ - private fun pull(reader: ParquetReader<GenericData.Record>, buffers: Map<String, Deque<FlopsHistoryFragment>>) { + private val queue = ArrayBlockingQueue<Pair<String, FlopsHistoryFragment>>(128) + + /** + * An optional filter for filtering the selected VMs + */ + private val filter = + if (selectedVms.isEmpty()) + null + else + FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms)))) + + /** + * A poisonous fragment. + */ + private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0)) + + /** + * The thread to read the records in. + */ + private val readerThread = thread(start = true, name = "sc20-reader") { + val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + try { + while (true) { + val record = reader.read() + + if (record == null) { + queue.put(poison) + break + } + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = FlopsHistoryFragment( + tick, + flops, + duration, + cpuUsage, + cores + ) + + queue.put(id to fragment) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + reader.close() + } + } + + /** + * Fill the buffers with the VMs + */ + private fun pull(buffers: Map<String, Deque<FlopsHistoryFragment>>) { if (!hasNext) { return } - repeat(buffers.size) { - val record = reader.read() + repeat(16) { + val (id, fragment) = queue.take() - if (record == null) { + if (id == poison.first) { hasNext = false - reader.close() return } - val id = record["id"].toString() - val tick = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long - - val fragment = FlopsHistoryFragment( - tick, - flops, - duration, - cpuUsage, - cores - ) - buffers[id]?.add(fragment) } } @@ -116,30 +162,20 @@ class Sc20ParquetTraceReader( val entries = mutableMapOf<UUID, TraceEntry<VmWorkload>>() val buffers = mutableMapOf<String, Deque<FlopsHistoryFragment>>() - val filter = - if (selectedVms.isEmpty()) - null - else - FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms)))) - val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet")) .disableCompatibility() .run { if (filter != null) withFilter(filter) else this } .build() - val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet")) - .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } - .build() - var idx = 0 while (true) { val record = metaReader.read() ?: break val id = record["id"].toString() val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long val maxCores = record["maxCores"] as Int val requiredMemory = record["requiredMemory"] as Long - + val uuid = UUID(0, (idx++).toLong()) println(id) val buffer = LinkedBlockingDeque<FlopsHistoryFragment>() @@ -148,16 +184,23 @@ class Sc20ParquetTraceReader( while (true) { if (buffer.isEmpty()) { if (hasNext) { - pull(reader, buffers) + pull(buffers) continue } else { break } } - yield(buffer.poll()) + + val fragment = buffer.poll() + yield(fragment) + + if (fragment.tick >= endTime) { + break + } } + + buffers.remove(id) } - val uuid = UUID(0, (idx++).toLong()) val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), @@ -191,7 +234,9 @@ class Sc20ParquetTraceReader( override fun next(): TraceEntry<VmWorkload> = iterator.next() - override fun close() {} + override fun close() { + readerThread.interrupt() + } private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable { override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 19055ad3..8478b592 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -237,13 +237,10 @@ fun main(args: Array<String>) { null } - var submitted = 0L - val finish = Channel<Unit>(Channel.RENDEZVOUS) - + val finished = Channel<Unit>(Channel.RENDEZVOUS) val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) while (reader.hasNext()) { val (time, workload) = reader.next() - submitted++ delay(max(0, time - simulationContext.clock.millis())) launch { chan.send(Unit) @@ -254,24 +251,26 @@ fun main(args: Array<String>) { // Monitor server events server.events .onEach { - if (it is ServerEvent.StateChanged) + if (it is ServerEvent.StateChanged) { monitor.onVmStateChanged(it.server) - - if (scheduler.submittedVms == submitted && scheduler.runningVms <= 1 && !reader.hasNext()) { - finish.send(Unit) } + + finished.send(Unit) } .collect() } } - finish.receive() - failureDomain?.cancel() - launch { - scheduler.terminate() + while (scheduler.finishedVms + scheduler.unscheduledVms != scheduler.submittedVms || reader.hasNext()) { + finished.receive() } - println(simulationContext.clock.instant()) - println("${System.currentTimeMillis() - start} milliseconds") + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + reader.close() + println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") } runBlocking { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt index 7f429b89..d005a157 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt @@ -46,6 +46,7 @@ fun main() { .fields() .name("id").type().stringType().noDefault() .name("submissionTime").type().longType().noDefault() + .name("endTime").type().longType().noDefault() .name("maxCores").type().intType().noDefault() .name("requiredMemory").type().longType().noDefault() .endRecord() @@ -92,7 +93,6 @@ fun main() { var vmId = "" var maxCores = -1 var requiredMemory = -1L - var timestamp = -1L var cores = -1 var minTime = Long.MAX_VALUE @@ -112,7 +112,7 @@ fun main() { val values = line.split(" ") vmId = vmFile.name - timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L cores = values[coreCol].trim().toInt() requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) maxCores = max(maxCores, cores) @@ -150,13 +150,16 @@ fun main() { } } + var maxTime = Long.MIN_VALUE flopsFragments.forEach { fragment -> allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) } val metaRecord = GenericData.Record(metaSchema) metaRecord.put("id", vmId) metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) metaRecord.put("maxCores", maxCores) metaRecord.put("requiredMemory", requiredMemory) metaWriter.write(metaRecord) |
