From a71d4885efcf01850bc236d3e9f77ab3f44b48aa Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 8 Jan 2021 18:18:43 +0100 Subject: Convert to pull-based workload model This change converts the low-level workload model to be pull-based. This reduces the overhead that we experienced with our previous co-routine based approach. --- .../format/trace/bitbrains/BitbrainsTraceReader.kt | 16 ++++++---------- .../kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt | 2 +- .../org/opendc/format/trace/sc20/Sc20TraceReader.kt | 8 ++------ .../kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt | 6 ------ .../kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt | 2 +- .../org/opendc/format/trace/swf/SwfTraceReaderTest.kt | 1 - 6 files changed, 10 insertions(+), 25 deletions(-) (limited to 'simulator/opendc-format/src') diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index 9353ef28..90d751ea 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -34,6 +34,7 @@ import java.io.BufferedReader import java.io.File import java.io.FileReader import java.util.* +import kotlin.math.min /** * A [TraceReader] for the public VM workload trace format. @@ -70,6 +71,7 @@ public class BitbrainsTraceReader( var vmId = -1L var cores = -1 var requiredMemory = -1L + var startTime = -1L BufferedReader(FileReader(vmFile)).use { reader -> reader.lineSequence() @@ -91,21 +93,17 @@ public class BitbrainsTraceReader( } vmId = vmFile.nameWithoutExtension.trim().toLong() - val timestamp = values[timestampCol].trim().toLong() - 5 * 60 + startTime = min(startTime, values[timestampCol].trim().toLong() - 5 * 60) cores = values[coreCol].trim().toInt() val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong() - val flops: Long = (cpuUsage * 5 * 60 * cores).toLong() - if (flopsHistory.isEmpty()) { - flopsHistory.add(SimTraceWorkload.Fragment(timestamp, flops, traceInterval, cpuUsage, cores)) + flopsHistory.add(SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores)) } else { - if (flopsHistory.last().flops != flops) { + if (flopsHistory.last().usage != cpuUsage) { flopsHistory.add( SimTraceWorkload.Fragment( - timestamp, - flops, traceInterval, cpuUsage, cores @@ -115,8 +113,6 @@ public class BitbrainsTraceReader( val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) flopsHistory.add( SimTraceWorkload.Fragment( - oldFragment.time, - oldFragment.flops + flops, oldFragment.duration + traceInterval, cpuUsage, cores @@ -151,7 +147,7 @@ public class BitbrainsTraceReader( ) ) entries[vmId] = TraceEntryImpl( - flopsHistory.firstOrNull()?.time ?: -1, + startTime, vmWorkload ) } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt index b721905d..c76889c8 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt @@ -139,7 +139,7 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader { val task = Task( UUID(0L, taskId), "", - SimWorkloadImage(UUID.randomUUID(), "", emptyMap(), SimFlopsWorkload(flops, cores)), + SimWorkloadImage(UUID.randomUUID(), "", emptyMap(), SimFlopsWorkload(flops)), HashSet(), mapOf( WORKFLOW_TASK_CORES to cores, diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt index 66efbcd0..78f581ca 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -125,20 +125,16 @@ public class Sc20TraceReader( requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) maxCores = max(maxCores, cores) - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { + last = if (last != null && last!!.usage == 0.0 && cpuUsage == 0.0) { val oldFragment = last!! SimTraceWorkload.Fragment( - oldFragment.time, - oldFragment.flops + flops, oldFragment.duration + traceInterval, cpuUsage, cores ) } else { val fragment = - SimTraceWorkload.Fragment(timestamp, flops, traceInterval, cpuUsage, cores) + SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores) if (last != null) { yield(last!!) } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt index 52d41c44..80c54354 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt @@ -113,8 +113,6 @@ public class SwfTraceReader( for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) { flopsHistory.add( SimTraceWorkload.Fragment( - tick * 1000L, - 0L, sliceDuration * 1000L, 0.0, cores @@ -138,8 +136,6 @@ public class SwfTraceReader( ) { flopsHistory.add( SimTraceWorkload.Fragment( - tick * 1000L, - flopsFullSlice / sliceDuration, sliceDuration * 1000L, 1.0, cores @@ -150,8 +146,6 @@ public class SwfTraceReader( if (runtimePartialSliceRemainder > 0) { flopsHistory.add( SimTraceWorkload.Fragment( - submitTime + (slicedWaitTime + runTime - runtimePartialSliceRemainder), - flopsPartialSlice, sliceDuration, runtimePartialSliceRemainder / sliceDuration.toDouble(), cores diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index 381a0b41..d7dc09fa 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -81,7 +81,7 @@ public class WtfTraceReader(path: String) : TraceReader { val task = Task( UUID(0L, taskId), "", - SimWorkloadImage(UUID.randomUUID(), "", emptyMap(), SimFlopsWorkload(flops, cores)), + SimWorkloadImage(UUID.randomUUID(), "", emptyMap(), SimFlopsWorkload(flops)), HashSet(), mapOf( WORKFLOW_TASK_CORES to cores, diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt index 8db2ab40..45c125c4 100644 --- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt +++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt @@ -41,7 +41,6 @@ class SwfTraceReaderTest { assertEquals(164472, entry.submissionTime) // 1188 slices for waiting, 0 full and 1 partial running slices assertEquals(1189, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().size) - assertEquals(5_100_000L, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().last().flops) assertEquals(0.25, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().last().usage) } } -- cgit v1.2.3