From 7e42d8e1a1ec23058af416548545fb06cfa3faa4 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 12 May 2020 10:25:43 +0200 Subject: Add SWF trace reading capability --- .../opendc/format/trace/swf/SwfTraceReader.kt | 186 +++++++++++++++++++++ .../opendc/format/trace/swf/SwfTraceReaderTest.kt | 14 ++ .../opendc-format/src/test/resources/swf_trace.txt | 6 + 3 files changed, 206 insertions(+) create mode 100644 opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt create mode 100644 opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt create mode 100644 opendc/opendc-format/src/test/resources/swf_trace.txt diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt new file mode 100644 index 00000000..2de3a9db --- /dev/null +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt @@ -0,0 +1,186 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.swf + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.UUID + +/** + * A [TraceReader] for reading SWF traces into VM-modeled workloads. + * + * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html + * + * @param file The trace file. + */ +class SwfTraceReader( + file: File +) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf>() + + val jobNumberCol = 0 + val submitTimeCol = 1 // seconds (begin of trace is 0) + val waitTimeCol = 2 // seconds + val runTimeCol = 3 // seconds + val numAllocatedCoresCol = 4 // We assume that single-core processors were used at the time + val requestedMemoryCol = 9 // KB per processor/core (-1 if not specified) + + val sliceDuration = 5 * 60L + + var jobNumber = -1L + var submitTime = -1L + var waitTime = -1L + var runTime = -1L + var cores = -1 + var memory = -1L + var slicedWaitTime = -1L + var flopsPerSecond = -1L + var flopsPartialSlice = -1L + var flopsFullSlice = -1L + var runtimePartialSliceRemainder = -1L + + BufferedReader(FileReader(file)).use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith(";") && line.isNotBlank() + } + .forEach { line -> + val values = line.trim().split("\\s+".toRegex()) + println(values) + + jobNumber = values[jobNumberCol].trim().toLong() + submitTime = values[submitTimeCol].trim().toLong() + waitTime = values[waitTimeCol].trim().toLong() + runTime = values[runTimeCol].trim().toLong() + cores = values[numAllocatedCoresCol].trim().toInt() + memory = values[requestedMemoryCol].trim().toLong() + + if (memory == -1L) { + memory = 1000L * cores // assume 1GB of memory per processor if not specified + } else { + memory /= 1000 // convert KB to MB + } + + val flopsHistory = mutableListOf() + + // Insert waiting time + // We ignore wait time remainders under one + slicedWaitTime = 0L + if (waitTime >= sliceDuration) { + for (tick in submitTime until waitTime step sliceDuration) { + flopsHistory.add( + FlopsHistoryFragment( + tick * 1000L, 0L, sliceDuration * 1000L, 0.0, cores + ) + ) + slicedWaitTime += sliceDuration + } + } + + // Insert run time + flopsPerSecond = 4_000L * cores + runtimePartialSliceRemainder = runTime % sliceDuration + flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder + flopsFullSlice = flopsPerSecond * runTime - flopsPartialSlice + + for (tick in (submitTime + slicedWaitTime) + until (submitTime + slicedWaitTime + runTime) + step sliceDuration) { + flopsHistory.add( + FlopsHistoryFragment( + tick * 1000L, flopsFullSlice / sliceDuration, sliceDuration * 1000L, 0.0, cores + ) + ) + } + + if (runtimePartialSliceRemainder > 0) { + flopsHistory.add( + FlopsHistoryFragment( + submitTime + (slicedWaitTime + runTime - runtimePartialSliceRemainder), + flopsPartialSlice, sliceDuration, 0.0, cores + ) + ) + } + + val uuid = UUID(0L, jobNumber) + val vmWorkload = VmWorkload( + uuid, "SWF Workload $jobNumber", UnnamedUser, + VmImage( + uuid, + jobNumber.toString(), + emptyMap(), + flopsHistory.asSequence(), + cores, + memory + ) + ) + + entries[jobNumber] = TraceEntryImpl(submitTime, vmWorkload) + } + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} diff --git a/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt b/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt new file mode 100644 index 00000000..0693c560 --- /dev/null +++ b/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt @@ -0,0 +1,14 @@ +package com.atlarge.opendc.format.trace.swf + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import java.io.File + +class SwfTraceReaderTest { + @Test + internal fun testParseSwf() { + val reader = SwfTraceReader(File(SwfTraceReaderTest::class.java.getResource("/swf_trace.txt").toURI())) + val entry = reader.next() + assertEquals(entry.submissionTime, 0) + } +} diff --git a/opendc/opendc-format/src/test/resources/swf_trace.txt b/opendc/opendc-format/src/test/resources/swf_trace.txt new file mode 100644 index 00000000..c3ecf890 --- /dev/null +++ b/opendc/opendc-format/src/test/resources/swf_trace.txt @@ -0,0 +1,6 @@ +; Excerpt from the PWA: CTC-SP2-1996-3.1-cln.swf + 1 0 588530 937 306 142.00 -1 -1 35100 -1 1 97 -1 307 3 -1 -1 -1 + 2 164472 356587 75 17 2.00 -1 -1 300 -1 1 81 -1 195 3 -1 -1 -1 + 3 197154 459987 35268 306 32792 -1 -1 35100 -1 0 97 -1 307 3 -1 -1 -1 + 4 310448 50431 29493 64 28745 -1 -1 64800 -1 1 38 -1 38 1 -1 -1 -1 + 5 310541 50766 29063 64 28191 -1 -1 64800 -1 1 38 -1 69 1 -1 -1 -1 -- cgit v1.2.3 From bf98c6fcdd59787d196619febd0d99ce2dccda33 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 12 May 2020 15:55:31 +0200 Subject: Add test --- .../atlarge/opendc/format/trace/swf/SwfTraceReader.kt | 18 +++++++++++------- .../opendc/format/trace/swf/SwfTraceReaderTest.kt | 13 +++++++++++-- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt index 2de3a9db..7a2c704b 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt @@ -85,7 +85,6 @@ class SwfTraceReader( } .forEach { line -> val values = line.trim().split("\\s+".toRegex()) - println(values) jobNumber = values[jobNumberCol].trim().toLong() submitTime = values[submitTimeCol].trim().toLong() @@ -102,11 +101,12 @@ class SwfTraceReader( val flopsHistory = mutableListOf() - // Insert waiting time + // Insert waiting time slices + // We ignore wait time remainders under one slicedWaitTime = 0L if (waitTime >= sliceDuration) { - for (tick in submitTime until waitTime step sliceDuration) { + for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) { flopsHistory.add( FlopsHistoryFragment( tick * 1000L, 0L, sliceDuration * 1000L, 0.0, cores @@ -116,18 +116,19 @@ class SwfTraceReader( } } - // Insert run time + // Insert run time slices + flopsPerSecond = 4_000L * cores runtimePartialSliceRemainder = runTime % sliceDuration flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder flopsFullSlice = flopsPerSecond * runTime - flopsPartialSlice for (tick in (submitTime + slicedWaitTime) - until (submitTime + slicedWaitTime + runTime) + until (submitTime + slicedWaitTime + runTime - sliceDuration) step sliceDuration) { flopsHistory.add( FlopsHistoryFragment( - tick * 1000L, flopsFullSlice / sliceDuration, sliceDuration * 1000L, 0.0, cores + tick * 1000L, flopsFullSlice / sliceDuration, sliceDuration * 1000L, 1.0, cores ) ) } @@ -136,7 +137,10 @@ class SwfTraceReader( flopsHistory.add( FlopsHistoryFragment( submitTime + (slicedWaitTime + runTime - runtimePartialSliceRemainder), - flopsPartialSlice, sliceDuration, 0.0, cores + flopsPartialSlice, + sliceDuration, + runtimePartialSliceRemainder / sliceDuration.toDouble(), + cores ) ) } diff --git a/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt b/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt index 0693c560..41ad8aba 100644 --- a/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt +++ b/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt @@ -8,7 +8,16 @@ class SwfTraceReaderTest { @Test internal fun testParseSwf() { val reader = SwfTraceReader(File(SwfTraceReaderTest::class.java.getResource("/swf_trace.txt").toURI())) - val entry = reader.next() - assertEquals(entry.submissionTime, 0) + var entry = reader.next() + assertEquals(0, entry.submissionTime) + // 1961 slices for waiting, 3 full and 1 partial running slices + assertEquals(1965, entry.workload.image.flopsHistory.toList().size) + + entry = reader.next() + assertEquals(164472, entry.submissionTime) + // 1188 slices for waiting, 0 full and 1 partial running slices + assertEquals(1189, entry.workload.image.flopsHistory.toList().size) + assertEquals(5_100_000L, entry.workload.image.flopsHistory.toList().last().flops) + assertEquals(0.25, entry.workload.image.flopsHistory.toList().last().usage) } } -- cgit v1.2.3 From 3536386080db1794b8e74c79a221e7b6e214b37c Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Wed, 13 May 2020 17:08:07 +0200 Subject: Add filtering option --- .../kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt index 7a2c704b..2f6ce238 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt @@ -43,7 +43,8 @@ import java.util.UUID * @param file The trace file. */ class SwfTraceReader( - file: File + file: File, + maxNumCores: Int = -1 ) : TraceReader { /** * The internal iterator to use for this reader. @@ -93,6 +94,11 @@ class SwfTraceReader( cores = values[numAllocatedCoresCol].trim().toInt() memory = values[requestedMemoryCol].trim().toLong() + if (maxNumCores != -1 && cores > maxNumCores) { + println("Skipped a task due to processor count ($cores > $maxNumCores).") + return@forEach + } + if (memory == -1L) { memory = 1000L * cores // assume 1GB of memory per processor if not specified } else { -- cgit v1.2.3