From b14df2a0924774c5aed15cedeb1027abf8ee5361 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 16 Sep 2021 16:52:00 +0200 Subject: refactor(capelin): Make workload sampling model extensible This change updates the workload sampling implementation to be more flexible in the way the workload is constructed. Users can now sample multiple workloads at the same time using multiple samplers and use them as a single workload to simulate. --- .../capelin/CompositeWorkloadPortfolio.kt | 28 ++- .../opendc/experiments/capelin/HorVerPortfolio.kt | 10 +- .../opendc/experiments/capelin/MoreHpcPortfolio.kt | 18 +- .../experiments/capelin/MoreVelocityPortfolio.kt | 10 +- .../capelin/OperationalPhenomenaPortfolio.kt | 10 +- .../org/opendc/experiments/capelin/Portfolio.kt | 26 +-- .../opendc/experiments/capelin/ReplayPortfolio.kt | 3 +- .../opendc/experiments/capelin/TestPortfolio.kt | 3 +- .../opendc/experiments/capelin/model/Workload.kt | 23 +-- .../capelin/trace/ParquetTraceReader.kt | 68 ------- .../experiments/capelin/trace/WorkloadSampler.kt | 199 --------------------- 11 files changed, 55 insertions(+), 343 deletions(-) delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt (limited to 'opendc-experiments/opendc-experiments-capelin/src/main') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt index faabe5cb..31e8f961 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt @@ -22,7 +22,8 @@ package org.opendc.experiments.capelin -import org.opendc.experiments.capelin.model.CompositeWorkload +import org.opendc.compute.workload.composite +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -42,30 +43,25 @@ public class CompositeWorkloadPortfolio : Portfolio("composite-workload") { ) override val workload: Workload by anyOf( - CompositeWorkload( + Workload( "all-azure", - listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)), - totalSampleLoad + composite(trace("solvinity-short") to 0.0, trace("azure") to 1.0) ), - CompositeWorkload( + Workload( "solvinity-25-azure-75", - listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)), - totalSampleLoad + composite(trace("solvinity-short") to 0.25, trace("azure") to 0.75) ), - CompositeWorkload( + Workload( "solvinity-50-azure-50", - listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)), - totalSampleLoad + composite(trace("solvinity-short") to 0.5, trace("azure") to 0.5) ), - CompositeWorkload( + Workload( "solvinity-75-azure-25", - listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)), - totalSampleLoad + composite(trace("solvinity-short") to 0.75, trace("azure") to 0.25) ), - CompositeWorkload( + Workload( "all-solvinity", - listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)), - totalSampleLoad + composite(trace("solvinity-short") to 1.0, trace("azure") to 0.0) ) ) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt index e1cf8517..cd093e6c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt @@ -22,6 +22,8 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -44,10 +46,10 @@ public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") { ) override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt index a995e467..73e59a58 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt @@ -22,8 +22,10 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByHpc +import org.opendc.compute.workload.sampleByHpcLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena -import org.opendc.experiments.capelin.model.SamplingStrategy import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.harness.dsl.anyOf @@ -40,13 +42,13 @@ public class MoreHpcPortfolio : Portfolio("more_hpc") { ) override val workload: Workload by anyOf( - Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD), - Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD), - Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD) + Workload("solvinity", trace("solvinity").sampleByHpc(0.0)), + Workload("solvinity", trace("solvinity").sampleByHpc(0.25)), + Workload("solvinity", trace("solvinity").sampleByHpc(0.5)), + Workload("solvinity", trace("solvinity").sampleByHpc(1.0)), + Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByHpcLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt index 49559e0e..9d5717bb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt @@ -22,6 +22,8 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -40,10 +42,10 @@ public class MoreVelocityPortfolio : Portfolio("more_velocity") { ) override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt index 1aac4f9e..7ab586b3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt @@ -22,6 +22,8 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -36,10 +38,10 @@ public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena") ) override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 02811d83..630b76c4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -24,18 +24,16 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.ComputeWorkloadRunner import org.opendc.compute.workload.export.parquet.ParquetExportMonitor import org.opendc.compute.workload.grid5000 import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.trace.RawParquetTraceReader import org.opendc.compute.workload.util.PerformanceInterferenceReader -import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.topology.clusterTopology -import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf @@ -47,7 +45,6 @@ import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration import java.util.* -import java.util.concurrent.ConcurrentHashMap import kotlin.math.roundToLong /** @@ -92,9 +89,9 @@ abstract class Portfolio(name: String) : Experiment(name) { abstract val allocationPolicy: String /** - * A map of trace readers. + * A helper class to load workload traces. */ - private val traceReaders = ConcurrentHashMap() + private val workloadLoader = ComputeWorkloadLoader(File(config.getString("trace-path"))) /** * Perform a single trial for this portfolio. @@ -102,19 +99,6 @@ abstract class Portfolio(name: String) : Experiment(name) { override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) - val workload = workload - val workloadNames = if (workload is CompositeWorkload) { - workload.workloads.map { it.name } - } else { - listOf(workload.name) - } - val rawReaders = workloadNames.map { workloadName -> - traceReaders.computeIfAbsent(workloadName) { - logger.info { "Loading trace $workloadName" } - RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) - } - } - val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) PerformanceInterferenceReader() .read(File(config.getString("interference-model"))) @@ -125,7 +109,7 @@ abstract class Portfolio(name: String) : Experiment(name) { val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements) val failureModel = if (operationalPhenomena.failureFrequency > 0) - grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt()) + grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong())) else null val runner = ComputeWorkloadRunner( @@ -149,7 +133,7 @@ abstract class Portfolio(name: String) : Experiment(name) { runner.apply(topology) // Run the workload trace - runner.run(trace) + runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) } finally { runner.close() metricReader.close() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt index b6d3b30c..17ec48d4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -36,7 +37,7 @@ public class ReplayPortfolio : Portfolio("replay") { ) override val workload: Workload by anyOf( - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity")) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt index 90840db8..98eb989d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -36,7 +37,7 @@ public class TestPortfolio : Portfolio("test") { ) override val workload: Workload by anyOf( - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity")) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt index c4ddd158..a2e71243 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt @@ -22,23 +22,12 @@ package org.opendc.experiments.capelin.model -public enum class SamplingStrategy { - REGULAR, - HPC, - HPC_LOAD -} +import org.opendc.compute.workload.ComputeWorkload /** - * A workload that is considered for a scenario. - */ -public open class Workload( - public open val name: String, - public val fraction: Double, - public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR -) - -/** - * A workload that is composed of multiple workloads. + * A single workload originating from a trace. + * + * @param name the name of the workload. + * @param source The source of the workload data. */ -public class CompositeWorkload(override val name: String, public val workloads: List, public val totalLoad: Double) : - Workload(name, -1.0) +data class Workload(val name: String, val source: ComputeWorkload) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt deleted file mode 100644 index 498636ba..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import org.opendc.compute.workload.trace.RawParquetTraceReader -import org.opendc.compute.workload.trace.TraceEntry -import org.opendc.compute.workload.trace.TraceReader -import org.opendc.experiments.capelin.model.CompositeWorkload -import org.opendc.experiments.capelin.model.Workload -import org.opendc.simulator.compute.workload.SimWorkload - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param rawReaders The internal raw trace readers to use. - * @param workload The workload to read. - * @param seed The seed to use for sampling. - */ -public class ParquetTraceReader( - rawReaders: List, - workload: Workload, - seed: Int -) : TraceReader { - /** - * The iterator over the actual trace. - */ - private val iterator: Iterator> = - rawReaders - .map { it.read() } - .run { - if (workload is CompositeWorkload) { - this.zip(workload.workloads) - } else { - this.zip(listOf(workload)) - } - } - .flatMap { - sampleWorkload(it.first, workload, it.second, seed) - .sortedBy(TraceEntry::start) - } - .iterator() - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt deleted file mode 100644 index b42951df..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import mu.KotlinLogging -import org.opendc.compute.workload.trace.TraceEntry -import org.opendc.experiments.capelin.model.CompositeWorkload -import org.opendc.experiments.capelin.model.SamplingStrategy -import org.opendc.experiments.capelin.model.Workload -import org.opendc.simulator.compute.workload.SimWorkload -import java.util.* -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * Sample the workload for the specified [run]. - */ -public fun sampleWorkload( - trace: List>, - workload: Workload, - subWorkload: Workload, - seed: Int -): List> { - return when { - workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) - workload.samplingStrategy == SamplingStrategy.HPC -> - sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false) - workload.samplingStrategy == SamplingStrategy.HPC_LOAD -> - sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true) - else -> - sampleRegularWorkload(trace, workload, workload, seed) - } -} - -/** - * Sample a regular (non-HPC) workload. - */ -public fun sampleRegularWorkload( - trace: List>, - workload: Workload, - subWorkload: Workload, - seed: Int -): List> { - val fraction = subWorkload.fraction - - val shuffled = trace.shuffled(Random(seed)) - val res = mutableListOf>() - val totalLoad = if (workload is CompositeWorkload) { - workload.totalLoad - } else { - shuffled.sumOf { it.meta.getValue("total-load") as Double } - } - var currentLoad = 0.0 - - for (entry in shuffled) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - currentLoad += entryLoad - res += entry - } - - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} - -/** - * Sample a HPC workload. - */ -public fun sampleHpcWorkload( - trace: List>, - workload: Workload, - seed: Int, - sampleOnLoad: Boolean -): List> { - val pattern = Regex("^vm__workload__(ComputeNode|cn).*") - val random = Random(seed) - - val fraction = workload.fraction - val (hpc, nonHpc) = trace.partition { entry -> - val name = entry.name - name.matches(pattern) - } - - val hpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf>() - hpc.mapTo(res) { sample(it, index) } - res.shuffle(random) - res - } - .flatten() - - val nonHpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf>() - nonHpc.mapTo(res) { sample(it, index) } - res.shuffle(random) - res - } - .flatten() - - logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } - - val totalLoad = if (workload is CompositeWorkload) { - workload.totalLoad - } else { - trace.sumOf { it.meta.getValue("total-load") as Double } - } - - logger.debug { "Total trace load: $totalLoad" } - var hpcCount = 0 - var hpcLoad = 0.0 - var nonHpcCount = 0 - var nonHpcLoad = 0.0 - - val res = mutableListOf>() - - if (sampleOnLoad) { - var currentLoad = 0.0 - for (entry in hpcSequence) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - hpcLoad += entryLoad - hpcCount += 1 - currentLoad += entryLoad - res += entry - } - - for (entry in nonHpcSequence) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > 1) { - break - } - - nonHpcLoad += entryLoad - nonHpcCount += 1 - currentLoad += entryLoad - res += entry - } - } else { - hpcSequence - .take((fraction * trace.size).toInt()) - .forEach { entry -> - hpcLoad += entry.meta.getValue("total-load") as Double - hpcCount += 1 - res.add(entry) - } - - nonHpcSequence - .take(((1 - fraction) * trace.size).toInt()) - .forEach { entry -> - nonHpcLoad += entry.meta.getValue("total-load") as Double - nonHpcCount += 1 - res.add(entry) - } - } - - logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } - logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} - -/** - * Sample a random trace entry. - */ -private fun sample(entry: TraceEntry, i: Int): TraceEntry { - val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) - return entry.copy(uid = uid) -} -- cgit v1.2.3