diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/main')
11 files changed, 55 insertions, 343 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt index faabe5cb..31e8f961 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt @@ -22,7 +22,8 @@ package org.opendc.experiments.capelin -import org.opendc.experiments.capelin.model.CompositeWorkload +import org.opendc.compute.workload.composite +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -42,30 +43,25 @@ public class CompositeWorkloadPortfolio : Portfolio("composite-workload") { ) override val workload: Workload by anyOf( - CompositeWorkload( + Workload( "all-azure", - listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)), - totalSampleLoad + composite(trace("solvinity-short") to 0.0, trace("azure") to 1.0) ), - CompositeWorkload( + Workload( "solvinity-25-azure-75", - listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)), - totalSampleLoad + composite(trace("solvinity-short") to 0.25, trace("azure") to 0.75) ), - CompositeWorkload( + Workload( "solvinity-50-azure-50", - listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)), - totalSampleLoad + composite(trace("solvinity-short") to 0.5, trace("azure") to 0.5) ), - CompositeWorkload( + Workload( "solvinity-75-azure-25", - listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)), - totalSampleLoad + composite(trace("solvinity-short") to 0.75, trace("azure") to 0.25) ), - CompositeWorkload( + Workload( "all-solvinity", - listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)), - totalSampleLoad + composite(trace("solvinity-short") to 1.0, trace("azure") to 0.0) ) ) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt index e1cf8517..cd093e6c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt @@ -22,6 +22,8 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -44,10 +46,10 @@ public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") { ) override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt index a995e467..73e59a58 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt @@ -22,8 +22,10 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByHpc +import org.opendc.compute.workload.sampleByHpcLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena -import org.opendc.experiments.capelin.model.SamplingStrategy import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.harness.dsl.anyOf @@ -40,13 +42,13 @@ public class MoreHpcPortfolio : Portfolio("more_hpc") { ) override val workload: Workload by anyOf( - Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD), - Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD), - Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD) + Workload("solvinity", trace("solvinity").sampleByHpc(0.0)), + Workload("solvinity", trace("solvinity").sampleByHpc(0.25)), + Workload("solvinity", trace("solvinity").sampleByHpc(0.5)), + Workload("solvinity", trace("solvinity").sampleByHpc(1.0)), + Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByHpcLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt index 49559e0e..9d5717bb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt @@ -22,6 +22,8 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -40,10 +42,10 @@ public class MoreVelocityPortfolio : Portfolio("more_velocity") { ) override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt index 1aac4f9e..7ab586b3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt @@ -22,6 +22,8 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -36,10 +38,10 @@ public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena") ) override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 02811d83..630b76c4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -24,18 +24,16 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.ComputeWorkloadRunner import org.opendc.compute.workload.export.parquet.ParquetExportMonitor import org.opendc.compute.workload.grid5000 import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.trace.RawParquetTraceReader import org.opendc.compute.workload.util.PerformanceInterferenceReader -import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.topology.clusterTopology -import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf @@ -47,7 +45,6 @@ import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration import java.util.* -import java.util.concurrent.ConcurrentHashMap import kotlin.math.roundToLong /** @@ -92,9 +89,9 @@ abstract class Portfolio(name: String) : Experiment(name) { abstract val allocationPolicy: String /** - * A map of trace readers. + * A helper class to load workload traces. */ - private val traceReaders = ConcurrentHashMap<String, RawParquetTraceReader>() + private val workloadLoader = ComputeWorkloadLoader(File(config.getString("trace-path"))) /** * Perform a single trial for this portfolio. @@ -102,19 +99,6 @@ abstract class Portfolio(name: String) : Experiment(name) { override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) - val workload = workload - val workloadNames = if (workload is CompositeWorkload) { - workload.workloads.map { it.name } - } else { - listOf(workload.name) - } - val rawReaders = workloadNames.map { workloadName -> - traceReaders.computeIfAbsent(workloadName) { - logger.info { "Loading trace $workloadName" } - RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) - } - } - val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) PerformanceInterferenceReader() .read(File(config.getString("interference-model"))) @@ -125,7 +109,7 @@ abstract class Portfolio(name: String) : Experiment(name) { val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements) val failureModel = if (operationalPhenomena.failureFrequency > 0) - grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt()) + grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong())) else null val runner = ComputeWorkloadRunner( @@ -149,7 +133,7 @@ abstract class Portfolio(name: String) : Experiment(name) { runner.apply(topology) // Run the workload trace - runner.run(trace) + runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) } finally { runner.close() metricReader.close() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt index b6d3b30c..17ec48d4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -36,7 +37,7 @@ public class ReplayPortfolio : Portfolio("replay") { ) override val workload: Workload by anyOf( - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity")) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt index 90840db8..98eb989d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -36,7 +37,7 @@ public class TestPortfolio : Portfolio("test") { ) override val workload: Workload by anyOf( - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity")) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt index c4ddd158..a2e71243 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt @@ -22,23 +22,12 @@ package org.opendc.experiments.capelin.model -public enum class SamplingStrategy { - REGULAR, - HPC, - HPC_LOAD -} +import org.opendc.compute.workload.ComputeWorkload /** - * A workload that is considered for a scenario. - */ -public open class Workload( - public open val name: String, - public val fraction: Double, - public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR -) - -/** - * A workload that is composed of multiple workloads. + * A single workload originating from a trace. + * + * @param name the name of the workload. + * @param source The source of the workload data. */ -public class CompositeWorkload(override val name: String, public val workloads: List<Workload>, public val totalLoad: Double) : - Workload(name, -1.0) +data class Workload(val name: String, val source: ComputeWorkload) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt deleted file mode 100644 index 498636ba..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import org.opendc.compute.workload.trace.RawParquetTraceReader -import org.opendc.compute.workload.trace.TraceEntry -import org.opendc.compute.workload.trace.TraceReader -import org.opendc.experiments.capelin.model.CompositeWorkload -import org.opendc.experiments.capelin.model.Workload -import org.opendc.simulator.compute.workload.SimWorkload - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param rawReaders The internal raw trace readers to use. - * @param workload The workload to read. - * @param seed The seed to use for sampling. - */ -public class ParquetTraceReader( - rawReaders: List<RawParquetTraceReader>, - workload: Workload, - seed: Int -) : TraceReader<SimWorkload> { - /** - * The iterator over the actual trace. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> = - rawReaders - .map { it.read() } - .run { - if (workload is CompositeWorkload) { - this.zip(workload.workloads) - } else { - this.zip(listOf(workload)) - } - } - .flatMap { - sampleWorkload(it.first, workload, it.second, seed) - .sortedBy(TraceEntry<SimWorkload>::start) - } - .iterator() - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() {} -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt deleted file mode 100644 index b42951df..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import mu.KotlinLogging -import org.opendc.compute.workload.trace.TraceEntry -import org.opendc.experiments.capelin.model.CompositeWorkload -import org.opendc.experiments.capelin.model.SamplingStrategy -import org.opendc.experiments.capelin.model.Workload -import org.opendc.simulator.compute.workload.SimWorkload -import java.util.* -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * Sample the workload for the specified [run]. - */ -public fun sampleWorkload( - trace: List<TraceEntry<SimWorkload>>, - workload: Workload, - subWorkload: Workload, - seed: Int -): List<TraceEntry<SimWorkload>> { - return when { - workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) - workload.samplingStrategy == SamplingStrategy.HPC -> - sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false) - workload.samplingStrategy == SamplingStrategy.HPC_LOAD -> - sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true) - else -> - sampleRegularWorkload(trace, workload, workload, seed) - } -} - -/** - * Sample a regular (non-HPC) workload. - */ -public fun sampleRegularWorkload( - trace: List<TraceEntry<SimWorkload>>, - workload: Workload, - subWorkload: Workload, - seed: Int -): List<TraceEntry<SimWorkload>> { - val fraction = subWorkload.fraction - - val shuffled = trace.shuffled(Random(seed)) - val res = mutableListOf<TraceEntry<SimWorkload>>() - val totalLoad = if (workload is CompositeWorkload) { - workload.totalLoad - } else { - shuffled.sumOf { it.meta.getValue("total-load") as Double } - } - var currentLoad = 0.0 - - for (entry in shuffled) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - currentLoad += entryLoad - res += entry - } - - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} - -/** - * Sample a HPC workload. - */ -public fun sampleHpcWorkload( - trace: List<TraceEntry<SimWorkload>>, - workload: Workload, - seed: Int, - sampleOnLoad: Boolean -): List<TraceEntry<SimWorkload>> { - val pattern = Regex("^vm__workload__(ComputeNode|cn).*") - val random = Random(seed) - - val fraction = workload.fraction - val (hpc, nonHpc) = trace.partition { entry -> - val name = entry.name - name.matches(pattern) - } - - val hpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf<TraceEntry<SimWorkload>>() - hpc.mapTo(res) { sample(it, index) } - res.shuffle(random) - res - } - .flatten() - - val nonHpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf<TraceEntry<SimWorkload>>() - nonHpc.mapTo(res) { sample(it, index) } - res.shuffle(random) - res - } - .flatten() - - logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } - - val totalLoad = if (workload is CompositeWorkload) { - workload.totalLoad - } else { - trace.sumOf { it.meta.getValue("total-load") as Double } - } - - logger.debug { "Total trace load: $totalLoad" } - var hpcCount = 0 - var hpcLoad = 0.0 - var nonHpcCount = 0 - var nonHpcLoad = 0.0 - - val res = mutableListOf<TraceEntry<SimWorkload>>() - - if (sampleOnLoad) { - var currentLoad = 0.0 - for (entry in hpcSequence) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - hpcLoad += entryLoad - hpcCount += 1 - currentLoad += entryLoad - res += entry - } - - for (entry in nonHpcSequence) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > 1) { - break - } - - nonHpcLoad += entryLoad - nonHpcCount += 1 - currentLoad += entryLoad - res += entry - } - } else { - hpcSequence - .take((fraction * trace.size).toInt()) - .forEach { entry -> - hpcLoad += entry.meta.getValue("total-load") as Double - hpcCount += 1 - res.add(entry) - } - - nonHpcSequence - .take(((1 - fraction) * trace.size).toInt()) - .forEach { entry -> - nonHpcLoad += entry.meta.getValue("total-load") as Double - nonHpcCount += 1 - res.add(entry) - } - } - - logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } - logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} - -/** - * Sample a random trace entry. - */ -private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> { - val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) - return entry.copy(uid = uid) -} |
