From a71d4885efcf01850bc236d3e9f77ab3f44b48aa Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 8 Jan 2021 18:18:43 +0100 Subject: Convert to pull-based workload model This change converts the low-level workload model to be pull-based. This reduces the overhead that we experienced with our previous co-routine based approach. --- .../opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt | 2 -- .../experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt | 8 ++++---- 2 files changed, 4 insertions(+), 6 deletions(-) (limited to 'simulator/opendc-experiments/opendc-experiments-sc20/src/main') diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index 9bc1a58e..4a318df4 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -66,8 +66,6 @@ public class Sc20RawParquetTraceReader(private val path: File) { val flops = record["flops"] as Long val fragment = SimTraceWorkload.Fragment( - tick, - flops, duration, cpuUsage, cores diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt index edef276c..ba22ae15 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt @@ -92,7 +92,7 @@ public class Sc20StreamingParquetTraceReader( /** * A poisonous fragment. */ - private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0, 0, 0.0, 0)) + private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0)) /** * The thread to read the records in. @@ -120,8 +120,6 @@ public class Sc20StreamingParquetTraceReader( val flops = record["flops"] as Long val fragment = SimTraceWorkload.Fragment( - tick, - flops, duration, cpuUsage, cores @@ -204,6 +202,7 @@ public class Sc20StreamingParquetTraceReader( val externalBuffer = mutableListOf() buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) val fragments = sequence { + var time = submissionTime repeat@ while (true) { if (externalBuffer.isEmpty()) { if (hasNext) { @@ -220,7 +219,8 @@ public class Sc20StreamingParquetTraceReader( for (fragment in internalBuffer) { yield(fragment) - if (fragment.time >= endTime) { + time += fragment.duration + if (time >= endTime) { break@repeat } } -- cgit v1.2.3