diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-04-14 15:58:51 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-04-22 20:18:45 +0200 |
| commit | f74f8d5adc240137f49c1b215004f9f17d0c213d (patch) | |
| tree | 111eda072777543c3aa5f648d6a93400d509683d | |
| parent | 938f60832d6a500fee74b5f44838287c5432a74e (diff) | |
refactor(compute): Load interference model via trace library
This change updates the compute support library to load the VM
interference model via the OpenDC trace library, which provides a
generic interface for reading interference models associated with
workload traces.
15 files changed, 86 insertions, 262 deletions
diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index 93e09b99..41a4b52d 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -40,7 +40,4 @@ dependencies { implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) - implementation(libs.jackson.databind) - implementation(libs.jackson.module.kotlin) - implementation(kotlin("reflect")) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt index 78002c2f..aa0b5eaf 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt @@ -22,6 +22,7 @@ package org.opendc.compute.workload +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import java.util.* /** @@ -31,5 +32,10 @@ public interface ComputeWorkload { /** * Resolve the workload into a list of [VirtualMachine]s to simulate. */ - public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> + public fun resolve(loader: ComputeWorkloadLoader, random: Random): Resolved + + /** + * A concrete instance of a workload. + */ + public data class Resolved(val vms: List<VirtualMachine>, val interferenceModel: VmInterferenceModel?) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 36a76f68..e4d86787 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -23,9 +23,11 @@ package org.opendc.compute.workload import mu.KotlinLogging +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimTrace import org.opendc.trace.* import java.io.File +import java.lang.ref.SoftReference import java.time.Duration import java.time.Instant import java.util.* @@ -47,7 +49,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * The cache of workloads. */ - private val cache = ConcurrentHashMap<String, List<VirtualMachine>>() + private val cache = ConcurrentHashMap<String, SoftReference<ComputeWorkload.Resolved>>() /** * Read the fragments into memory. @@ -145,18 +147,59 @@ public class ComputeWorkloadLoader(private val baseDir: File) { } /** + * Read the interference model associated with the specified [trace]. + */ + private fun parseInterferenceModel(trace: Trace): VmInterferenceModel { + val reader = checkNotNull(trace.getTable(TABLE_INTERFERENCE_GROUPS)).newReader() + + return try { + val membersCol = reader.resolve(INTERFERENCE_GROUP_MEMBERS) + val targetCol = reader.resolve(INTERFERENCE_GROUP_TARGET) + val scoreCol = reader.resolve(INTERFERENCE_GROUP_SCORE) + + val modelBuilder = VmInterferenceModel.builder() + + while (reader.nextRow()) { + @Suppress("UNCHECKED_CAST") + val members = reader.get(membersCol) as Set<String> + val target = reader.getDouble(targetCol) + val score = reader.getDouble(scoreCol) + + modelBuilder + .addGroup(members, target, score) + } + + modelBuilder.build() + } finally { + reader.close() + } + } + + /** * Load the trace with the specified [name] and [format]. */ - public fun get(name: String, format: String): List<VirtualMachine> { - return cache.computeIfAbsent(name) { - val path = baseDir.resolve(it) + public fun get(name: String, format: String): ComputeWorkload.Resolved { + val ref = cache.compute(name) { key, oldVal -> + val inst = oldVal?.get() + if (inst == null) { - logger.info { "Loading trace $it at $path" } + val path = baseDir.resolve(key) - val trace = Trace.open(path, format) - val fragments = parseFragments(trace) - parseMeta(trace, fragments) + logger.info { "Loading trace $key at $path" } + + val trace = Trace.open(path, format) + val fragments = parseFragments(trace) + val vms = parseMeta(trace, fragments) + val interferenceModel = parseInterferenceModel(trace) + val instance = ComputeWorkload.Resolved(vms, interferenceModel) + + SoftReference(instance) + } else { + oldVal + } } + + return checkNotNull(ref?.get()) { "Memory pressure" } } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt index 9b2bec55..1959c48d 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt @@ -37,17 +37,17 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } - val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } + val totalLoad = traces.sumOf { (_, w) -> w.vms.sumOf { it.totalLoad } } val res = mutableListOf<VirtualMachine>() - for ((fraction, vms) in traces) { + for ((fraction, w) in traces) { var currentLoad = 0.0 - for (entry in vms) { + for (entry in w.vms) { val entryLoad = entry.totalLoad if ((currentLoad + entryLoad) / totalLoad > fraction) { break @@ -58,9 +58,9 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double } } - val vmCount = traces.sumOf { (_, vms) -> vms.size } + val vmCount = traces.sumOf { (_, w) -> w.vms.size } logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" } - return res + return ComputeWorkload.Resolved(res, null) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt index 52f4c672..84a77f0f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt @@ -45,8 +45,8 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti */ private val pattern = Regex("^(ComputeNode|cn).*") - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { - val vms = source.resolve(loader, random) + override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { + val (vms, interferenceModel) = source.resolve(loader, random) val (hpc, nonHpc) = vms.partition { entry -> val name = entry.name @@ -130,7 +130,7 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - return res + return ComputeWorkload.Resolved(res, interferenceModel) } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt index ef6de729..bc13560c 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt @@ -37,8 +37,8 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { - val vms = source.resolve(loader, random) + override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { + val (vms, interferenceModel) = source.resolve(loader, random) val res = mutableListOf<VirtualMachine>() val totalLoad = vms.sumOf { it.totalLoad } @@ -56,6 +56,6 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - return res + return ComputeWorkload.Resolved(res, interferenceModel) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt index c20cb8f3..dc9abaef 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -24,14 +24,13 @@ package org.opendc.compute.workload.internal import org.opendc.compute.workload.ComputeWorkload import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine import java.util.* /** * A [ComputeWorkload] from a trace. */ internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { return loader.get(name, format) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt deleted file mode 100644 index e0fa8904..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.util - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.core.JsonParseException -import com.fasterxml.jackson.core.JsonParser -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import java.io.File -import java.io.InputStream - -/** - * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. - */ -public class VmInterferenceModelReader { - /** - * The [ObjectMapper] to use. - */ - private val mapper = jacksonObjectMapper() - - /** - * Read the performance interface model from [file]. - */ - public fun read(file: File): VmInterferenceModel { - val builder = VmInterferenceModel.builder() - val parser = mapper.createParser(file) - parseGroups(parser, builder) - return builder.build() - } - - /** - * Read the performance interface model from the input. - */ - public fun read(input: InputStream): VmInterferenceModel { - val builder = VmInterferenceModel.builder() - val parser = mapper.createParser(input) - parseGroups(parser, builder) - return builder.build() - } - - /** - * Parse all groups in an interference JSON file. - */ - private fun parseGroups(parser: JsonParser, builder: VmInterferenceModel.Builder) { - parser.nextToken() - - if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}") - } - - while (parser.nextToken() != JsonToken.END_ARRAY) { - parseGroup(parser, builder) - } - } - - /** - * Parse a group an interference JSON file. - */ - private fun parseGroup(parser: JsonParser, builder: VmInterferenceModel.Builder) { - var targetLoad = Double.POSITIVE_INFINITY - var score = 1.0 - val members = mutableSetOf<String>() - - if (!parser.isExpectedStartObjectToken) { - throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}") - } - - while (parser.nextValue() != JsonToken.END_OBJECT) { - when (parser.currentName) { - "vms" -> parseGroupMembers(parser, members) - "minServerLoad" -> targetLoad = parser.doubleValue - "performanceScore" -> score = parser.doubleValue - } - } - - builder.addGroup(members, targetLoad, score) - } - - /** - * Parse the members of a group. - */ - private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) { - if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") - } - - while (parser.nextValue() != JsonToken.END_ARRAY) { - if (parser.currentToken() != JsonToken.VALUE_STRING) { - throw JsonParseException(parser, "Expected string value for group member") - } - - val member = parser.text.removePrefix("vm__workload__").removeSuffix(".txt") - members.add(member) - } - } - - private data class Group( - @JsonProperty("minServerLoad") - val targetLoad: Double, - @JsonProperty("performanceScore") - val score: Double, - @JsonProperty("vms") - val members: Set<String>, - ) -} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt deleted file mode 100644 index 1c3e7149..00000000 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.util - -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow - -/** - * Test suite for the [VmInterferenceModelReader] class. - */ -class VmInterferenceModelReaderTest { - @Test - fun testSmoke() { - val input = checkNotNull(VmInterferenceModelReader::class.java.getResourceAsStream("/perf-interference.json")) - assertDoesNotThrow { VmInterferenceModelReader().read(input) } - } -} diff --git a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json deleted file mode 100644 index 1be5852b..00000000 --- a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json +++ /dev/null @@ -1,22 +0,0 @@ -[ - { - "vms": [ - "vm_a", - "vm_c", - "vm_x", - "vm_y" - ], - "minServerLoad": 0.0, - "performanceScore": 0.8830158730158756 - }, - { - "vms": [ - "vm_a", - "vm_b", - "vm_c", - "vm_d" - ], - "minServerLoad": 0.0, - "performanceScore": 0.7133055555552751 - } -] diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index bb9cb201..83b8c0c6 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -58,7 +58,7 @@ class CapelinBenchmarks { fun setUp() { val loader = ComputeWorkloadLoader(File("src/test/resources/trace")) val source = trace("bitbrains-small") - vms = source.resolve(loader, Random(1L)) + vms = source.resolve(loader, Random(1L)).vms topology = checkNotNull(object {}.javaClass.getResourceAsStream("/env/topology.txt")).use { clusterTopology(it) } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 6604a190..0bbf1443 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -23,7 +23,6 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory -import mu.KotlinLogging import org.opendc.compute.workload.ComputeServiceHelper import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.createComputeScheduler @@ -31,7 +30,6 @@ import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter import org.opendc.compute.workload.grid5000 import org.opendc.compute.workload.telemetry.SdkTelemetryManager import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -52,11 +50,6 @@ import kotlin.math.roundToLong */ abstract class Portfolio(name: String) : Experiment(name) { /** - * The logger for this portfolio instance. - */ - private val logger = KotlinLogging.logger {} - - /** * The configuration to use. */ private val config = ConfigFactory.load().getConfig("opendc.experiments.capelin") @@ -97,18 +90,13 @@ abstract class Portfolio(name: String) : Experiment(name) { override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) - val performanceInterferenceModel = if (operationalPhenomena.hasInterference) - VmInterferenceModelReader() - .read(File(config.getString("interference-model"))) - else - null - val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements) val failureModel = if (operationalPhenomena.failureFrequency > 0) grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong())) else null + val (vms, interferenceModel) = workload.source.resolve(workloadLoader, seeder) val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, @@ -116,7 +104,7 @@ abstract class Portfolio(name: String) : Experiment(name) { telemetry, computeScheduler, failureModel, - performanceInterferenceModel?.withSeed(repeat.toLong()) + interferenceModel?.withSeed(repeat.toLong()) ) val exporter = ParquetComputeMetricExporter( @@ -132,8 +120,8 @@ abstract class Portfolio(name: String) : Experiment(name) { // Instantiate the desired topology runner.apply(topology) - // Converge the workload trace - runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) + // Run the workload trace + runner.run(vms, seeder.nextLong()) } finally { runner.close() } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 891fc8be..01b2a8fe 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -35,7 +35,6 @@ import org.opendc.compute.workload.* import org.opendc.compute.workload.telemetry.SdkTelemetryManager import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter @@ -85,7 +84,7 @@ class CapelinIntegrationTest { */ @Test fun testLarge() = runBlockingSimulation { - val workload = createTestWorkload(1.0) + val (workload, _) = createTestWorkload(1.0) val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, @@ -135,7 +134,7 @@ class CapelinIntegrationTest { @Test fun testSmall() = runBlockingSimulation { val seed = 1 - val workload = createTestWorkload(0.25, seed) + val (workload, _) = createTestWorkload(0.25, seed) val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, @@ -180,12 +179,7 @@ class CapelinIntegrationTest { @Test fun testInterference() = runBlockingSimulation { val seed = 0 - val workload = createTestWorkload(1.0, seed) - val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json")) - val performanceInterferenceModel = - VmInterferenceModelReader() - .read(perfInterferenceInput) - .withSeed(seed.toLong()) + val (workload, interferenceModel) = createTestWorkload(1.0, seed) val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( @@ -193,7 +187,7 @@ class CapelinIntegrationTest { clock, telemetry, computeScheduler, - interferenceModel = performanceInterferenceModel + interferenceModel = interferenceModel?.withSeed(seed.toLong()) ) val topology = createTopology("single") @@ -240,7 +234,7 @@ class CapelinIntegrationTest { grid5000(Duration.ofDays(7)) ) val topology = createTopology("single") - val workload = createTestWorkload(0.25, seed) + val (workload, _) = createTestWorkload(0.25, seed) telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) @@ -274,7 +268,7 @@ class CapelinIntegrationTest { /** * Obtain the trace reader for the test. */ - private fun createTestWorkload(fraction: Double, seed: Int = 0): List<VirtualMachine> { + private fun createTestWorkload(fraction: Double, seed: Int = 0): ComputeWorkload.Resolved { val source = trace("bitbrains-small").sampleByLoad(fraction) return source.resolve(workloadLoader, Random(seed.toLong())) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/interference-model.json index 51fc6366..51fc6366 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/interference-model.json diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index bd770574..a150de4e 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -28,8 +28,6 @@ import org.opendc.compute.workload.telemetry.SdkTelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.VmInterferenceModelReader -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -145,21 +143,8 @@ public class OpenDCRunner( val heartbeat = scheduler.scheduleWithFixedDelay({ manager.heartbeat(id) }, 0, heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS) try { - logger.debug { "Constructing performance interference model" } - - val interferenceModel = let { - val path = tracePath.resolve(scenario.workload.trace.id).resolve("performance-interference-model.json") - val enabled = scenario.phenomena.interference - - if (!enabled || !path.exists()) { - return@let null - } - - VmInterferenceModelReader().read(path.inputStream()) - } - val topology = convertTopology(scenario.topology) - val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology, interferenceModel) } + val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology) } val results = invokeAll(jobs) logger.info { "Finished simulation for job $id" } @@ -190,13 +175,11 @@ public class OpenDCRunner( * @param scenario The scenario to simulate. * @param repeat The repeat number used to seed the simulation. * @param topology The topology to simulate. - * @param interferenceModel The [VmInterferenceModel] used in this scenario. */ private inner class SimulationTask( private val scenario: Scenario, private val repeat: Int, private val topology: Topology, - private val interferenceModel: VmInterferenceModel? ) : RecursiveTask<WebComputeMetricExporter.Results>() { override fun compute(): WebComputeMetricExporter.Results { val exporter = WebComputeMetricExporter() @@ -215,6 +198,7 @@ public class OpenDCRunner( val phenomena = scenario.phenomena val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder) val workload = trace(workloadName).sampleByLoad(workloadFraction) + val (vms, interferenceModel) = workload.resolve(workloadLoader, seeder) val failureModel = if (phenomena.failures) @@ -229,7 +213,7 @@ public class OpenDCRunner( telemetry, computeScheduler, failureModel, - interferenceModel + interferenceModel.takeIf { phenomena.interference } ) telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1))) @@ -238,7 +222,7 @@ public class OpenDCRunner( // Instantiate the topology onto the simulator simulator.apply(topology) // Run workload trace - simulator.run(workload.resolve(workloadLoader, seeder), seeder.nextLong()) + simulator.run(vms, seeder.nextLong()) val serviceMetrics = collectServiceMetrics(telemetry.metricProducer) logger.debug { |
