diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-04-20 12:13:09 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-04-20 12:13:09 +0200 |
| commit | 3e056406616860c77168d827f1ca9d8d3c79c08e (patch) | |
| tree | 74e8993babb28dd56950f1da69eda2d97735a0e8 | |
| parent | 6cd93b57945b289b2e14556f7ceaa193326eff78 (diff) | |
perf: Minor tweaks in trace fetching
2 files changed, 80 insertions, 62 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt index 24ff9eed..0a7718e9 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt @@ -42,12 +42,10 @@ import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary import java.io.File import java.io.Serializable -import java.util.Deque import java.util.SortedSet import java.util.TreeSet import java.util.UUID import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.LinkedBlockingDeque import kotlin.concurrent.thread import kotlin.random.Random @@ -72,7 +70,7 @@ class Sc20ParquetTraceReader( /** * The intermediate buffer to store the read records in. */ - private val queue = ArrayBlockingQueue<Pair<String, FlopsHistoryFragment>>(128) + private val queue = ArrayBlockingQueue<Pair<String, FlopsHistoryFragment>>(1024) /** * An optional filter for filtering the selected VMs @@ -133,20 +131,20 @@ class Sc20ParquetTraceReader( /** * Fill the buffers with the VMs */ - private fun pull(buffers: Map<String, Deque<FlopsHistoryFragment>>) { + private fun pull(buffers: Map<String, List<MutableList<FlopsHistoryFragment>>>) { if (!hasNext) { return } - repeat(16) { - val (id, fragment) = queue.take() + val fragments = mutableListOf<Pair<String, FlopsHistoryFragment>>() + queue.drainTo(fragments) + for ((id, fragment) in fragments) { if (id == poison.first) { hasNext = false return } - - buffers[id]?.add(fragment) + buffers[id]?.forEach { it.add(fragment) } } } @@ -159,75 +157,93 @@ class Sc20ParquetTraceReader( * Initialize the reader. */ init { - val entries = mutableMapOf<UUID, TraceEntry<VmWorkload>>() - val buffers = mutableMapOf<String, Deque<FlopsHistoryFragment>>() + val takenIds = mutableSetOf<UUID>() + val entries = mutableMapOf<String, GenericData.Record>() + val buffers = mutableMapOf<String, MutableList<MutableList<FlopsHistoryFragment>>>() val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet")) .disableCompatibility() .run { if (filter != null) withFilter(filter) else this } .build() - var idx = 0 while (true) { val record = metaReader.read() ?: break val id = record["id"].toString() - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uuid = UUID(0, (idx++).toLong()) - println(id) - - val buffer = LinkedBlockingDeque<FlopsHistoryFragment>() - buffers[id] = buffer - val fragments = sequence<FlopsHistoryFragment> { - while (true) { - if (buffer.isEmpty()) { - if (hasNext) { - pull(buffers) - continue - } else { - break + entries[id] = record + } + + metaReader.close() + + val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms + + // Create the entry iterator + iterator = selection.asSequence() + .mapNotNull { entries[it] } + .mapIndexed { index, record -> + val id = record["id"].toString() + val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) + + assert(uid !in takenIds) + takenIds += uid + + println(id) + + val internalBuffer = mutableListOf<FlopsHistoryFragment>() + val externalBuffer = mutableListOf<FlopsHistoryFragment>() + buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) + val fragments = sequence<FlopsHistoryFragment> { + repeat@while (true) { + if (externalBuffer.isEmpty()) { + if (hasNext) { + pull(buffers) + continue + } else { + break + } } - } - val fragment = buffer.poll() - yield(fragment) + internalBuffer.addAll(externalBuffer) + externalBuffer.clear() - if (fragment.tick >= endTime) { - break - } - } + for (fragment in internalBuffer) { + yield(fragment) - buffers.remove(id) - } - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), - Random(random.nextInt()) - ) - val vmWorkload = VmWorkload( - uuid, "VM Workload $id", UnnamedUser, - VmImage( - uuid, - id, - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - fragments, - maxCores, - requiredMemory - ) - ) + if (fragment.tick >= endTime) { + break@repeat + } + } - entries[uuid] = TraceEntryImpl( - submissionTime, - vmWorkload - ) - } + internalBuffer.clear() + } - metaReader.close() + buffers.remove(id) + } + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), + Random(random.nextInt()) + ) + val vmWorkload = VmWorkload( + uid, "VM Workload $id", UnnamedUser, + VmImage( + uid, + id, + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + fragments, + maxCores, + requiredMemory + ) + ) - // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + TraceEntryImpl(submissionTime, vmWorkload) + } + .sortedBy { it.submissionTime } + .toList() + .iterator() } override fun hasNext(): Boolean = iterator.hasNext() diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 8478b592..028cfb9a 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -237,10 +237,12 @@ fun main(args: Array<String>) { null } + var submitted = 0L val finished = Channel<Unit>(Channel.RENDEZVOUS) val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) while (reader.hasNext()) { val (time, workload) = reader.next() + submitted++ delay(max(0, time - simulationContext.clock.millis())) launch { chan.send(Unit) @@ -261,7 +263,7 @@ fun main(args: Array<String>) { } } - while (scheduler.finishedVms + scheduler.unscheduledVms != scheduler.submittedVms || reader.hasNext()) { + while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) { finished.receive() } |
