From 4aa1ed9d20c7a87c6b5388ddf33b25769f91f20b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 5 May 2020 21:31:38 +0200 Subject: refactor: Rename VmTraceReader to BitbrainsTraceReader --- .../format/trace/bitbrains/BitbrainsTraceReader.kt | 173 +++++++++++++++++++++ .../opendc/format/trace/sc20/Sc20TraceReader.kt | 2 - .../opendc/format/trace/vm/VmTraceReader.kt | 173 --------------------- 3 files changed, 173 insertions(+), 175 deletions(-) create mode 100644 opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt delete mode 100644 opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt (limited to 'opendc/opendc-format') diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt new file mode 100644 index 00000000..5220af9b --- /dev/null +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -0,0 +1,173 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.bitbrains + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.UUID + +/** + * A [TraceReader] for the public VM workload trace format. + * + * @param traceDirectory The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +class BitbrainsTraceReader( + traceDirectory: File, + performanceInterferenceModel: PerformanceInterferenceModel +) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf>() + + var timestampCol = 0 + var coreCol = 0 + var cpuUsageCol = 0 + var provisionedMemoryCol = 0 + val traceInterval = 5 * 60 * 1000L + + traceDirectory.walk() + .filterNot { it.isDirectory } + .forEach { vmFile -> + println(vmFile) + val flopsHistory = mutableListOf() + var vmId = -1L + var cores = -1 + var requiredMemory = -1L + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(";\t") + + // Parse GWF header + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + timestampCol = header["Timestamp [ms]"]!! + coreCol = header["CPU cores"]!! + cpuUsageCol = header["CPU usage [MHZ]"]!! + provisionedMemoryCol = header["Memory capacity provisioned [KB]"]!! + return@forEachIndexed + } + + vmId = vmFile.nameWithoutExtension.trim().toLong() + val timestamp = values[timestampCol].trim().toLong() - 5 * 60 + cores = values[coreCol].trim().toInt() + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong() + + val flops: Long = (cpuUsage * 5 * 60 * cores).toLong() + + if (flopsHistory.isEmpty()) { + flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) + } else { + if (flopsHistory.last().flops != flops) { + flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) + } else { + val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) + flopsHistory.add( + FlopsHistoryFragment( + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + ) + } + } + } + } + + val uuid = UUID(0L, vmId) + + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet() + ) + + val vmWorkload = VmWorkload( + uuid, "VM Workload $vmId", UnnamedUser, + VmImage( + uuid, + vmId.toString(), + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + flopsHistory.asSequence(), + cores, + requiredMemory + ) + ) + entries[vmId] = TraceEntryImpl( + flopsHistory.firstOrNull()?.tick ?: -1, + vmWorkload + ) + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index 2e2159ba..c53cd569 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -117,7 +117,6 @@ class Sc20TraceReader( reader.lineSequence() .chunked(128) .forEach { lines -> - // val res = ArrayList(lines.size) for (line in lines) { // Ignore comments in the trace if (line.startsWith("#") || line.isBlank()) { @@ -149,7 +148,6 @@ class Sc20TraceReader( fragment } } - // yieldAll(res) } if (last != null) { diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt deleted file mode 100644 index fe1049d9..00000000 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt +++ /dev/null @@ -1,173 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.format.trace.vm - -import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment -import com.atlarge.opendc.compute.core.image.VmImage -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.core.User -import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.format.trace.TraceEntry -import com.atlarge.opendc.format.trace.TraceReader -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.UUID - -/** - * A [TraceReader] for the public VM workload trace format. - * - * @param traceDirectory The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - */ -class VmTraceReader( - traceDirectory: File, - performanceInterferenceModel: PerformanceInterferenceModel -) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf>() - - var timestampCol = 0 - var coreCol = 0 - var cpuUsageCol = 0 - var provisionedMemoryCol = 0 - val traceInterval = 5 * 60 * 1000L - - traceDirectory.walk() - .filterNot { it.isDirectory } - .forEach { vmFile -> - println(vmFile) - val flopsHistory = mutableListOf() - var vmId = -1L - var cores = -1 - var requiredMemory = -1L - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith("#") && line.isNotBlank() - } - .forEachIndexed { idx, line -> - val values = line.split(";\t") - - // Parse GWF header - if (idx == 0) { - val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() - timestampCol = header["Timestamp [ms]"]!! - coreCol = header["CPU cores"]!! - cpuUsageCol = header["CPU usage [MHZ]"]!! - provisionedMemoryCol = header["Memory capacity provisioned [KB]"]!! - return@forEachIndexed - } - - vmId = vmFile.nameWithoutExtension.trim().toLong() - val timestamp = values[timestampCol].trim().toLong() - 5 * 60 - cores = values[coreCol].trim().toInt() - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong() - - val flops: Long = (cpuUsage * 5 * 60 * cores).toLong() - - if (flopsHistory.isEmpty()) { - flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) - } else { - if (flopsHistory.last().flops != flops) { - flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) - } else { - val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) - flopsHistory.add( - FlopsHistoryFragment( - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - ) - } - } - } - } - - val uuid = UUID(0L, vmId) - - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet() - ) - - val vmWorkload = VmWorkload( - uuid, "VM Workload $vmId", UnnamedUser, - VmImage( - uuid, - vmId.toString(), - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsHistory.asSequence(), - cores, - requiredMemory - ) - ) - entries[vmId] = TraceEntryImpl( - flopsHistory.firstOrNull()?.tick ?: -1, - vmWorkload - ) - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry -} -- cgit v1.2.3 From 48f6a6f2d42851bc2eeed5b6ef41145740c70286 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 5 May 2020 22:35:24 +0200 Subject: test: Add initial integration test for SC20 experiments --- .../format/environment/sc20/Sc20ClusterEnvironmentReader.kt | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'opendc/opendc-format') diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 2ef0db97..e34ee2dc 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -38,9 +38,9 @@ import com.atlarge.opendc.core.Platform import com.atlarge.opendc.core.Zone import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.format.environment.EnvironmentReader -import java.io.BufferedReader import java.io.File -import java.io.FileReader +import java.io.FileInputStream +import java.io.InputStream import java.util.Random import java.util.UUID @@ -50,8 +50,11 @@ import java.util.UUID * @param environmentFile The file describing the physical cluster. */ class Sc20ClusterEnvironmentReader( - private val environmentFile: File + private val input: InputStream ) : EnvironmentReader { + + constructor(file: File) : this(FileInputStream(file)) + @Suppress("BlockingMethodInNonBlockingContext") override suspend fun construct(dom: Domain): Environment { var clusterIdCol = 0 @@ -70,7 +73,7 @@ class Sc20ClusterEnvironmentReader( val nodes = mutableListOf() val random = Random(0) - BufferedReader(FileReader(environmentFile)).use { reader -> + input.bufferedReader().use { reader -> reader.lineSequence() .filter { line -> // Ignore comments in the file -- cgit v1.2.3