diff options
| author | Georgios Andreadis <info@gandreadis.com> | 2020-07-25 12:08:59 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-08-24 19:48:26 +0200 |
| commit | fcf9147c116de46a57b662f7a84255faf5581d67 (patch) | |
| tree | c50dc8c00ed4048902b489fffa2fabab53b2bfd1 | |
| parent | 2892f05baeb79f77586eb36d3506a57b1d20b8aa (diff) | |
| parent | 382a08ea8d80563753e6bdfea1db154a4b44bf35 (diff) | |
Merge pull request #16 from atlarge-research/feat/hpc-sampling
Finish HPC sampler
4 files changed, 60 insertions, 25 deletions
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt index f458877b..3f885f89 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt @@ -25,8 +25,7 @@ package com.atlarge.opendc.compute.core.workload import com.atlarge.opendc.compute.core.Server -import java.util.SortedSet -import java.util.TreeSet +import java.util.* import kotlin.random.Random /** @@ -44,23 +43,22 @@ class PerformanceInterferenceModel( val random: Random = Random(0) ) { private var intersectingItems: List<PerformanceInterferenceModelItem> = emptyList() - private val colocatedWorkloads = TreeSet<String>() + private val colocatedWorkloads = TreeMap<String, Int>() fun vmStarted(server: Server) { - colocatedWorkloads.add(server.image.name) + colocatedWorkloads.merge(server.image.name, 1, Int::plus) intersectingItems = items.filter { item -> doesMatch(item) } } fun vmStopped(server: Server) { - colocatedWorkloads.remove(server.image.name) + colocatedWorkloads.computeIfPresent(server.image.name) { _, v -> (v - 1).takeUnless { it == 0 } } intersectingItems = items.filter { item -> doesMatch(item) } } private fun doesMatch(item: PerformanceInterferenceModelItem): Boolean { var count = 0 - for (name in item.workloadNames.subSet(colocatedWorkloads.first(), colocatedWorkloads.last() + "\u0000")) { - if (name in colocatedWorkloads) - count++ + for (name in item.workloadNames.subSet(colocatedWorkloads.firstKey(), colocatedWorkloads.lastKey() + "\u0000")) { + count += colocatedWorkloads.getOrDefault(name, 0) if (count > 1) return true } diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt index b8dfb1be..09a6ce40 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt @@ -203,6 +203,7 @@ public class MoreHpcPortfolio(parent: Experiment, id: Int) : Portfolio(parent, i ) override val workloads = listOf( + Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC), Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC), Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC), Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC), diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt index b24d6de1..a46bb3e6 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -109,6 +109,24 @@ fun sampleHpcWorkload( name.matches(pattern) } + val hpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf<TraceEntry<VmWorkload>>() + hpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + val nonHpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf<TraceEntry<VmWorkload>>() + nonHpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } val totalLoad = if (workload is CompositeWorkload) { @@ -117,45 +135,60 @@ fun sampleHpcWorkload( trace.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } } + logger.debug { "Total trace load: $totalLoad" } + var hpcCount = 0 + var hpcLoad = 0.0 + var nonHpcCount = 0 + var nonHpcLoad = 0.0 + val res = mutableListOf<TraceEntry<VmWorkload>>() if (sampleOnLoad) { var currentLoad = 0.0 var i = 0 - while (true) { - // Sample random HPC entry with replacement - val entry = sample(hpc.random(random), i++) - + for (entry in hpcSequence) { val entryLoad = entry.workload.image.tags.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction || res.size > trace.size) { + if ((currentLoad + entryLoad) / totalLoad > fraction) { break } + hpcLoad += entryLoad + hpcCount += 1 currentLoad += entryLoad res += entry } - (nonHpc as MutableList<TraceEntry<VmWorkload>>).shuffle(random) - for (entry in nonHpc) { + for (entry in nonHpcSequence) { val entryLoad = entry.workload.image.tags.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > 1 || res.size > trace.size) { + if ((currentLoad + entryLoad) / totalLoad > 1) { break } + nonHpcLoad += entryLoad + nonHpcCount += 1 currentLoad += entryLoad res += entry } } else { - repeat((fraction * trace.size).toInt()) { i -> - // Sample random HPC entry with replacement - val entry = sample(hpc.random(random), i) - res.add(entry) - } + hpcSequence + .take((fraction * trace.size).toInt()) + .forEach { entry -> + hpcLoad += entry.workload.image.tags.getValue("total-load") as Double + hpcCount += 1 + res.add(entry) + } - (nonHpc as MutableList<TraceEntry<VmWorkload>>).shuffle(random) - res.addAll(nonHpc.subList(0, ((1 - fraction) * trace.size).toInt())) + nonHpcSequence + .take(((1 - fraction) * trace.size).toInt()) + .forEach { entry -> + nonHpcLoad += entry.workload.image.tags.getValue("total-load") as Double + nonHpcCount += 1 + res.add(entry) + } } + logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } + logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } return res @@ -168,13 +201,13 @@ private fun sample(entry: TraceEntry<VmWorkload>, i: Int): TraceEntry<VmWorkload val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray()) val image = VmImage( id, - entry.workload.image.name + "-$i", + entry.workload.image.name, entry.workload.image.tags, entry.workload.image.flopsHistory, entry.workload.image.maxCores, entry.workload.image.requiredMemory ) - val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name + "-$i") + val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name) return VmTraceEntry(vmWorkload, entry.submissionTime) } diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml b/simulator/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml index f47a6da8..6906bfc3 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml +++ b/simulator/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml @@ -39,6 +39,9 @@ <Logger name="com.atlarge.opendc.experiments.sc20" level="info" additivity="false"> <AppenderRef ref="Console"/> </Logger> + <Logger name="com.atlarge.opendc.experiments.sc20.trace" level="debug" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> <Logger name="org.apache.hadoop" level="warn" additivity="false"> <AppenderRef ref="Console"/> </Logger> |
