summaryrefslogtreecommitdiff
path: root/opendc/opendc-format/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'opendc/opendc-format/src/main')
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt71
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt2
2 files changed, 52 insertions, 21 deletions
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
index 96daa2ce..d4eef029 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -37,6 +37,7 @@ import java.io.File
import java.io.FileReader
import java.util.UUID
import kotlin.math.max
+import kotlin.math.min
/**
* A [TraceReader] for the internal VM workload trace format.
@@ -81,10 +82,13 @@ class Sc20TraceReader(
vms
.forEachIndexed { idx, vmFile ->
println(vmFile)
- val flopsHistory = mutableListOf<FlopsHistoryFragment>()
+
var vmId = ""
var maxCores = -1
var requiredMemory = -1L
+ var timestamp = -1L
+ var cores = -1
+ var minTime = Long.MAX_VALUE
BufferedReader(FileReader(vmFile)).use { reader ->
reader.lineSequence()
@@ -96,33 +100,61 @@ class Sc20TraceReader(
val values = line.split(" ")
vmId = vmFile.name
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
- val cores = values[coreCol].trim().toInt()
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+ cores = values[coreCol].trim().toInt()
requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ }
+ }
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- if (flopsHistory.isEmpty()) {
- flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores))
- } else {
- // Restrict merging to empty fragments for now
- if (flopsHistory.last().flops == 0L && flops == 0L) {
- val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1)
- flopsHistory.add(
+ val flopsFragments = sequence {
+ var last: FlopsHistoryFragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(1024)
+ .forEach { lines ->
+ val res = ArrayList<FlopsHistoryFragment>(lines.size)
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(" ")
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
FlopsHistoryFragment(
oldFragment.tick,
oldFragment.flops + flops,
oldFragment.duration + traceInterval,
cpuUsage,
- cores)
- )
- } else {
- flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores))
+ cores
+ )
+ } else {
+ val fragment =
+ FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)
+ if (last != null) {
+ res.add(last!!)
+ }
+ fragment
+ }
}
+
+ yieldAll(res)
}
+
+ if (last != null) {
+ yield(last!!)
}
+ }
}
val uuid = UUID(0, idx.toLong())
@@ -131,20 +163,19 @@ class Sc20TraceReader(
PerformanceInterferenceModel(
performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet()
)
-
val vmWorkload = VmWorkload(
uuid, "VM Workload $vmId", UnnamedUser,
VmImage(
uuid,
vmId,
mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- flopsHistory,
+ flopsFragments,
maxCores,
requiredMemory
)
)
entries[uuid] = TraceEntryImpl(
- flopsHistory.firstOrNull()?.tick ?: -1,
+ minTime,
vmWorkload
)
}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
index be9ddfaa..fe1049d9 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
@@ -134,7 +134,7 @@ class VmTraceReader(
uuid,
vmId.toString(),
mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- flopsHistory,
+ flopsHistory.asSequence(),
cores,
requiredMemory
)