diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-04-20 15:21:35 +0200 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-04-20 15:21:35 +0200 |
| commit | 6981d5581e2ce5c6df42dfbf133c350bd9c35a0f (patch) | |
| tree | 74e8993babb28dd56950f1da69eda2d97735a0e8 /opendc/opendc-format/src/main | |
| parent | 60372f0022d423efd5267ef4008d9afcbe870911 (diff) | |
| parent | 3e056406616860c77168d827f1ca9d8d3c79c08e (diff) | |
Merge branch 'bug/experiment-issues' into '2.x'
Address issues during experiments
See merge request opendc/opendc-simulator!61
Diffstat (limited to 'opendc/opendc-format/src/main')
5 files changed, 76 insertions, 37 deletions
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 89a59e1c..2ef0db97 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -41,6 +41,7 @@ import com.atlarge.opendc.format.environment.EnvironmentReader import java.io.BufferedReader import java.io.File import java.io.FileReader +import java.util.Random import java.util.UUID /** @@ -67,6 +68,7 @@ class Sc20ClusterEnvironmentReader( var coresPerHost: Int val nodes = mutableListOf<SimpleBareMetalDriver>() + val random = Random(0) BufferedReader(FileReader(environmentFile)).use { reader -> reader.lineSequence() @@ -87,6 +89,7 @@ class Sc20ClusterEnvironmentReader( return@forEachIndexed } + clusterIdx++ clusterId = values[clusterIdCol].trim() speed = values[speedCol].trim().toDouble() * 1000.0 numberOfHosts = values[numberOfHostsCol].trim().toInt() @@ -100,7 +103,7 @@ class Sc20ClusterEnvironmentReader( nodes.add( SimpleBareMetalDriver( dom.newDomain("node-$clusterId-$it"), - UUID((clusterIdx++).toLong(), it.toLong()), + UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$it", mapOf(NODE_CLUSTER to clusterId), List(coresPerHost) { coreId -> diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt index f9ebba3d..a653e643 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.format.trace -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import java.io.Closeable /** diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt index daa1fdf8..8562cefe 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt @@ -24,13 +24,14 @@ package com.atlarge.opendc.format.trace.sc20 -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.core.workload.PerformanceInterferenceModelItem +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModelItem import com.atlarge.opendc.format.trace.PerformanceInterferenceModelReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import java.io.InputStream +import java.util.TreeSet /** * A parser for the JSON performance interference setup files used for the SC20 paper. @@ -49,7 +50,7 @@ class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper return PerformanceInterferenceModel( performanceInterferenceModel.map { item -> PerformanceInterferenceModelItem( - item.vms.toSet(), + TreeSet(item.vms), item.minServerLoad, item.performanceScore ) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index c40cb039..2e2159ba 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User -import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import java.io.BufferedReader @@ -37,6 +37,8 @@ import java.io.File import java.io.FileReader import java.util.UUID import kotlin.math.max +import kotlin.math.min +import kotlin.random.Random /** * A [TraceReader] for the internal VM workload trace format. @@ -47,7 +49,8 @@ import kotlin.math.max class Sc20TraceReader( traceDirectory: File, performanceInterferenceModel: PerformanceInterferenceModel, - selectedVms: List<String> + selectedVms: List<String>, + random: Random ) : TraceReader<VmWorkload> { /** * The internal iterator to use for this reader. @@ -81,10 +84,13 @@ class Sc20TraceReader( vms .forEachIndexed { idx, vmFile -> println(vmFile) - val flopsHistory = mutableListOf<FlopsHistoryFragment>() + var vmId = "" var maxCores = -1 var requiredMemory = -1L + var timestamp = -1L + var cores = -1 + var minTime = Long.MAX_VALUE BufferedReader(FileReader(vmFile)).use { reader -> reader.lineSequence() @@ -96,54 +102,82 @@ class Sc20TraceReader( val values = line.split(" ") vmId = vmFile.name - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - val cores = values[coreCol].trim().toInt() - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + cores = values[coreCol].trim().toInt() requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + } + } - val flops: Long = (cpuUsage * 5 * 60).toLong() - - if (flopsHistory.isEmpty()) { - flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) - } else { - // Restrict merging to empty fragments for now - if (flopsHistory.last().flops == 0L && flops == 0L) { - val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) - flopsHistory.add( + val flopsFragments = sequence { + var last: FlopsHistoryFragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + // val res = ArrayList<FlopsHistoryFragment>(lines.size) + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(" ") + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! FlopsHistoryFragment( oldFragment.tick, oldFragment.flops + flops, oldFragment.duration + traceInterval, cpuUsage, - cores) - ) - } else { - flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) + cores + ) + } else { + val fragment = + FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores) + if (last != null) { + yield(last!!) + } + fragment + } } + // yieldAll(res) } + + if (last != null) { + yield(last!!) } + } } val uuid = UUID(0, idx.toLong()) - val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet() - ) - + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet(), + Random(random.nextInt()) + ) val vmWorkload = VmWorkload( uuid, "VM Workload $vmId", UnnamedUser, VmImage( uuid, vmId, mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsHistory, + flopsFragments.asSequence(), maxCores, requiredMemory ) ) entries[uuid] = TraceEntryImpl( - flopsHistory.firstOrNull()?.tick ?: -1, + minTime, vmWorkload ) } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt index fbe77654..fe1049d9 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt @@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User -import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import java.io.BufferedReader @@ -123,9 +123,10 @@ class VmTraceReader( val uuid = UUID(0L, vmId) - val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet() - ) + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet() + ) val vmWorkload = VmWorkload( uuid, "VM Workload $vmId", UnnamedUser, @@ -133,7 +134,7 @@ class VmTraceReader( uuid, vmId.toString(), mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsHistory, + flopsHistory.asSequence(), cores, requiredMemory ) |
