summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-20 12:13:09 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-20 12:13:09 +0200
commit3e056406616860c77168d827f1ca9d8d3c79c08e (patch)
tree74e8993babb28dd56950f1da69eda2d97735a0e8
parent6cd93b57945b289b2e14556f7ceaa193326eff78 (diff)
perf: Minor tweaks in trace fetching
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt138
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt4
2 files changed, 80 insertions, 62 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
index 24ff9eed..0a7718e9 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
@@ -42,12 +42,10 @@ import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
import java.io.File
import java.io.Serializable
-import java.util.Deque
import java.util.SortedSet
import java.util.TreeSet
import java.util.UUID
import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.LinkedBlockingDeque
import kotlin.concurrent.thread
import kotlin.random.Random
@@ -72,7 +70,7 @@ class Sc20ParquetTraceReader(
/**
* The intermediate buffer to store the read records in.
*/
- private val queue = ArrayBlockingQueue<Pair<String, FlopsHistoryFragment>>(128)
+ private val queue = ArrayBlockingQueue<Pair<String, FlopsHistoryFragment>>(1024)
/**
* An optional filter for filtering the selected VMs
@@ -133,20 +131,20 @@ class Sc20ParquetTraceReader(
/**
* Fill the buffers with the VMs
*/
- private fun pull(buffers: Map<String, Deque<FlopsHistoryFragment>>) {
+ private fun pull(buffers: Map<String, List<MutableList<FlopsHistoryFragment>>>) {
if (!hasNext) {
return
}
- repeat(16) {
- val (id, fragment) = queue.take()
+ val fragments = mutableListOf<Pair<String, FlopsHistoryFragment>>()
+ queue.drainTo(fragments)
+ for ((id, fragment) in fragments) {
if (id == poison.first) {
hasNext = false
return
}
-
- buffers[id]?.add(fragment)
+ buffers[id]?.forEach { it.add(fragment) }
}
}
@@ -159,75 +157,93 @@ class Sc20ParquetTraceReader(
* Initialize the reader.
*/
init {
- val entries = mutableMapOf<UUID, TraceEntry<VmWorkload>>()
- val buffers = mutableMapOf<String, Deque<FlopsHistoryFragment>>()
+ val takenIds = mutableSetOf<UUID>()
+ val entries = mutableMapOf<String, GenericData.Record>()
+ val buffers = mutableMapOf<String, MutableList<MutableList<FlopsHistoryFragment>>>()
val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
.disableCompatibility()
.run { if (filter != null) withFilter(filter) else this }
.build()
- var idx = 0
while (true) {
val record = metaReader.read() ?: break
val id = record["id"].toString()
- val submissionTime = record["submissionTime"] as Long
- val endTime = record["endTime"] as Long
- val maxCores = record["maxCores"] as Int
- val requiredMemory = record["requiredMemory"] as Long
- val uuid = UUID(0, (idx++).toLong())
- println(id)
-
- val buffer = LinkedBlockingDeque<FlopsHistoryFragment>()
- buffers[id] = buffer
- val fragments = sequence<FlopsHistoryFragment> {
- while (true) {
- if (buffer.isEmpty()) {
- if (hasNext) {
- pull(buffers)
- continue
- } else {
- break
+ entries[id] = record
+ }
+
+ metaReader.close()
+
+ val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms
+
+ // Create the entry iterator
+ iterator = selection.asSequence()
+ .mapNotNull { entries[it] }
+ .mapIndexed { index, record ->
+ val id = record["id"].toString()
+ val submissionTime = record["submissionTime"] as Long
+ val endTime = record["endTime"] as Long
+ val maxCores = record["maxCores"] as Int
+ val requiredMemory = record["requiredMemory"] as Long
+ val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray())
+
+ assert(uid !in takenIds)
+ takenIds += uid
+
+ println(id)
+
+ val internalBuffer = mutableListOf<FlopsHistoryFragment>()
+ val externalBuffer = mutableListOf<FlopsHistoryFragment>()
+ buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
+ val fragments = sequence<FlopsHistoryFragment> {
+ repeat@while (true) {
+ if (externalBuffer.isEmpty()) {
+ if (hasNext) {
+ pull(buffers)
+ continue
+ } else {
+ break
+ }
}
- }
- val fragment = buffer.poll()
- yield(fragment)
+ internalBuffer.addAll(externalBuffer)
+ externalBuffer.clear()
- if (fragment.tick >= endTime) {
- break
- }
- }
+ for (fragment in internalBuffer) {
+ yield(fragment)
- buffers.remove(id)
- }
- val relevantPerformanceInterferenceModelItems =
- PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(),
- Random(random.nextInt())
- )
- val vmWorkload = VmWorkload(
- uuid, "VM Workload $id", UnnamedUser,
- VmImage(
- uuid,
- id,
- mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- fragments,
- maxCores,
- requiredMemory
- )
- )
+ if (fragment.tick >= endTime) {
+ break@repeat
+ }
+ }
- entries[uuid] = TraceEntryImpl(
- submissionTime,
- vmWorkload
- )
- }
+ internalBuffer.clear()
+ }
- metaReader.close()
+ buffers.remove(id)
+ }
+ val relevantPerformanceInterferenceModelItems =
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(),
+ Random(random.nextInt())
+ )
+ val vmWorkload = VmWorkload(
+ uid, "VM Workload $id", UnnamedUser,
+ VmImage(
+ uid,
+ id,
+ mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
+ fragments,
+ maxCores,
+ requiredMemory
+ )
+ )
- // Create the entry iterator
- iterator = entries.values.sortedBy { it.submissionTime }.iterator()
+ TraceEntryImpl(submissionTime, vmWorkload)
+ }
+ .sortedBy { it.submissionTime }
+ .toList()
+ .iterator()
}
override fun hasNext(): Boolean = iterator.hasNext()
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 8478b592..028cfb9a 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -237,10 +237,12 @@ fun main(args: Array<String>) {
null
}
+ var submitted = 0L
val finished = Channel<Unit>(Channel.RENDEZVOUS)
val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed))
while (reader.hasNext()) {
val (time, workload) = reader.next()
+ submitted++
delay(max(0, time - simulationContext.clock.millis()))
launch {
chan.send(Unit)
@@ -261,7 +263,7 @@ fun main(args: Array<String>) {
}
}
- while (scheduler.finishedVms + scheduler.unscheduledVms != scheduler.submittedVms || reader.hasNext()) {
+ while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) {
finished.receive()
}