From 998466e611438e9f4381e5d693ef4119a3cf8905 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 00:17:18 +0200 Subject: bug: Address uid collision issue --- .../opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'opendc/opendc-format/src') diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 89a59e1c..2ef0db97 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -41,6 +41,7 @@ import com.atlarge.opendc.format.environment.EnvironmentReader import java.io.BufferedReader import java.io.File import java.io.FileReader +import java.util.Random import java.util.UUID /** @@ -67,6 +68,7 @@ class Sc20ClusterEnvironmentReader( var coresPerHost: Int val nodes = mutableListOf() + val random = Random(0) BufferedReader(FileReader(environmentFile)).use { reader -> reader.lineSequence() @@ -87,6 +89,7 @@ class Sc20ClusterEnvironmentReader( return@forEachIndexed } + clusterIdx++ clusterId = values[clusterIdCol].trim() speed = values[speedCol].trim().toDouble() * 1000.0 numberOfHosts = values[numberOfHostsCol].trim().toInt() @@ -100,7 +103,7 @@ class Sc20ClusterEnvironmentReader( nodes.add( SimpleBareMetalDriver( dom.newDomain("node-$clusterId-$it"), - UUID((clusterIdx++).toLong(), it.toLong()), + UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$it", mapOf(NODE_CLUSTER to clusterId), List(coresPerHost) { coreId -> -- cgit v1.2.3 From a77829c7a8cf300a99a695423d85f905e0209286 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 12:16:42 +0200 Subject: perf: Optimize performance interference --- .../opendc/format/trace/PerformanceInterferenceModelReader.kt | 2 +- .../format/trace/sc20/Sc20PerformanceInterferenceReader.kt | 7 ++++--- .../com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt | 11 ++++++----- .../com/atlarge/opendc/format/trace/vm/VmTraceReader.kt | 11 ++++++----- 4 files changed, 17 insertions(+), 14 deletions(-) (limited to 'opendc/opendc-format/src') diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt index f9ebba3d..a653e643 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.format.trace -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import java.io.Closeable /** diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt index daa1fdf8..8562cefe 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt @@ -24,13 +24,14 @@ package com.atlarge.opendc.format.trace.sc20 -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.core.workload.PerformanceInterferenceModelItem +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModelItem import com.atlarge.opendc.format.trace.PerformanceInterferenceModelReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import java.io.InputStream +import java.util.TreeSet /** * A parser for the JSON performance interference setup files used for the SC20 paper. @@ -49,7 +50,7 @@ class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper return PerformanceInterferenceModel( performanceInterferenceModel.map { item -> PerformanceInterferenceModelItem( - item.vms.toSet(), + TreeSet(item.vms), item.minServerLoad, item.performanceScore ) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index c40cb039..96daa2ce 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User -import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import java.io.BufferedReader @@ -127,9 +127,10 @@ class Sc20TraceReader( val uuid = UUID(0, idx.toLong()) - val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet() - ) + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet() + ) val vmWorkload = VmWorkload( uuid, "VM Workload $vmId", UnnamedUser, diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt index fbe77654..be9ddfaa 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt @@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User -import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import java.io.BufferedReader @@ -123,9 +123,10 @@ class VmTraceReader( val uuid = UUID(0L, vmId) - val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet() - ) + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet() + ) val vmWorkload = VmWorkload( uuid, "VM Workload $vmId", UnnamedUser, -- cgit v1.2.3 From eab4c190142f54291ed235e4e18f3a35385a541c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 16:12:25 +0200 Subject: perf: Optimize trace loading for memory usage --- .../opendc/format/trace/sc20/Sc20TraceReader.kt | 71 ++++++++++++++++------ .../opendc/format/trace/vm/VmTraceReader.kt | 2 +- 2 files changed, 52 insertions(+), 21 deletions(-) (limited to 'opendc/opendc-format/src') diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index 96daa2ce..d4eef029 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -37,6 +37,7 @@ import java.io.File import java.io.FileReader import java.util.UUID import kotlin.math.max +import kotlin.math.min /** * A [TraceReader] for the internal VM workload trace format. @@ -81,10 +82,13 @@ class Sc20TraceReader( vms .forEachIndexed { idx, vmFile -> println(vmFile) - val flopsHistory = mutableListOf() + var vmId = "" var maxCores = -1 var requiredMemory = -1L + var timestamp = -1L + var cores = -1 + var minTime = Long.MAX_VALUE BufferedReader(FileReader(vmFile)).use { reader -> reader.lineSequence() @@ -96,33 +100,61 @@ class Sc20TraceReader( val values = line.split(" ") vmId = vmFile.name - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - val cores = values[coreCol].trim().toInt() - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + cores = values[coreCol].trim().toInt() requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + } + } - val flops: Long = (cpuUsage * 5 * 60).toLong() - - if (flopsHistory.isEmpty()) { - flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) - } else { - // Restrict merging to empty fragments for now - if (flopsHistory.last().flops == 0L && flops == 0L) { - val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) - flopsHistory.add( + val flopsFragments = sequence { + var last: FlopsHistoryFragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(1024) + .forEach { lines -> + val res = ArrayList(lines.size) + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(" ") + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! FlopsHistoryFragment( oldFragment.tick, oldFragment.flops + flops, oldFragment.duration + traceInterval, cpuUsage, - cores) - ) - } else { - flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) + cores + ) + } else { + val fragment = + FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores) + if (last != null) { + res.add(last!!) + } + fragment + } } + + yieldAll(res) } + + if (last != null) { + yield(last!!) } + } } val uuid = UUID(0, idx.toLong()) @@ -131,20 +163,19 @@ class Sc20TraceReader( PerformanceInterferenceModel( performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet() ) - val vmWorkload = VmWorkload( uuid, "VM Workload $vmId", UnnamedUser, VmImage( uuid, vmId, mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsHistory, + flopsFragments, maxCores, requiredMemory ) ) entries[uuid] = TraceEntryImpl( - flopsHistory.firstOrNull()?.tick ?: -1, + minTime, vmWorkload ) } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt index be9ddfaa..fe1049d9 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt @@ -134,7 +134,7 @@ class VmTraceReader( uuid, vmId.toString(), mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsHistory, + flopsHistory.asSequence(), cores, requiredMemory ) -- cgit v1.2.3 From 9a7bfac2475b1169c4aa9dee820dd30f412a39c1 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 17:46:31 +0200 Subject: feat: Add support for seeding experiments --- .../kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'opendc/opendc-format/src') diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index d4eef029..da678c07 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -38,6 +38,7 @@ import java.io.FileReader import java.util.UUID import kotlin.math.max import kotlin.math.min +import kotlin.random.Random /** * A [TraceReader] for the internal VM workload trace format. @@ -48,7 +49,8 @@ import kotlin.math.min class Sc20TraceReader( traceDirectory: File, performanceInterferenceModel: PerformanceInterferenceModel, - selectedVms: List + selectedVms: List, + random: Random ) : TraceReader { /** * The internal iterator to use for this reader. @@ -161,7 +163,8 @@ class Sc20TraceReader( val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet() + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet(), + Random(random.nextInt()) ) val vmWorkload = VmWorkload( uuid, "VM Workload $vmId", UnnamedUser, -- cgit v1.2.3 From e097aeb16d77c260126b65c7f13330076d800d52 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 16 Apr 2020 01:35:33 +0200 Subject: perf: Convert traces to Parquet format --- .../com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'opendc/opendc-format/src') diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index da678c07..2e2159ba 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -115,9 +115,9 @@ class Sc20TraceReader( BufferedReader(FileReader(vmFile)).use { reader -> reader.lineSequence() - .chunked(1024) + .chunked(128) .forEach { lines -> - val res = ArrayList(lines.size) + // val res = ArrayList(lines.size) for (line in lines) { // Ignore comments in the trace if (line.startsWith("#") || line.isBlank()) { @@ -144,13 +144,12 @@ class Sc20TraceReader( val fragment = FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores) if (last != null) { - res.add(last!!) + yield(last!!) } fragment } } - - yieldAll(res) + // yieldAll(res) } if (last != null) { @@ -172,7 +171,7 @@ class Sc20TraceReader( uuid, vmId, mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsFragments, + flopsFragments.asSequence(), maxCores, requiredMemory ) -- cgit v1.2.3