diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-04-22 21:55:32 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-04-22 21:55:32 +0200 |
| commit | a7a5362c52274e4fef377cf68b53b4399679d304 (patch) | |
| tree | 91b01df54833017b94e5a1b2d43dd1dfcbf29c62 /opendc-compute | |
| parent | 0f1be7a820d5e3b279e68209a5bb6219d176b732 (diff) | |
| parent | b6698d96fb1313909705604be2daf1170ea40d68 (diff) | |
merge: Improve discovery of interference models (#76)
This pull request intends to improve discovery of interference models. Previously, interference models were not tied to the workload trace, meaning they had to be resolved separately from the workload trace. In reality, the interference model is always tied to the particular workload trace.
With this pull request, we integrate the interference model into the `odcvm` trace format and make it available through the `opendc-trace` library. This has as additional benefit that we can support different interference formats in the future using the same API.
Furthermore, this change allows us to ship the interference model with the workload traces and resolve them automatically in the future using some form of package manager.
## Implementation Notes :hammer_and_pick:
* Incorporate interference model in trace format
* Load interference model via trace library
* Move conventions into separate package
## External Dependencies :four_leaf_clover:
* N/A
## Breaking API Changes :warning:
* `VmInterferenceModelReader` has been removed from `opendc-compute-workload`
* Table and column conventions have been moved in `org.opendc.trace.conv` package
Diffstat (limited to 'opendc-compute')
10 files changed, 72 insertions, 213 deletions
diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index 93e09b99..41a4b52d 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -40,7 +40,4 @@ dependencies { implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) - implementation(libs.jackson.databind) - implementation(libs.jackson.module.kotlin) - implementation(kotlin("reflect")) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt index 78002c2f..aa0b5eaf 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt @@ -22,6 +22,7 @@ package org.opendc.compute.workload +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import java.util.* /** @@ -31,5 +32,10 @@ public interface ComputeWorkload { /** * Resolve the workload into a list of [VirtualMachine]s to simulate. */ - public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> + public fun resolve(loader: ComputeWorkloadLoader, random: Random): Resolved + + /** + * A concrete instance of a workload. + */ + public data class Resolved(val vms: List<VirtualMachine>, val interferenceModel: VmInterferenceModel?) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 36a76f68..720c7e58 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -23,9 +23,12 @@ package org.opendc.compute.workload import mu.KotlinLogging +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimTrace import org.opendc.trace.* +import org.opendc.trace.conv.* import java.io.File +import java.lang.ref.SoftReference import java.time.Duration import java.time.Instant import java.util.* @@ -47,7 +50,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * The cache of workloads. */ - private val cache = ConcurrentHashMap<String, List<VirtualMachine>>() + private val cache = ConcurrentHashMap<String, SoftReference<ComputeWorkload.Resolved>>() /** * Read the fragments into memory. @@ -145,18 +148,59 @@ public class ComputeWorkloadLoader(private val baseDir: File) { } /** + * Read the interference model associated with the specified [trace]. + */ + private fun parseInterferenceModel(trace: Trace): VmInterferenceModel { + val reader = checkNotNull(trace.getTable(TABLE_INTERFERENCE_GROUPS)).newReader() + + return try { + val membersCol = reader.resolve(INTERFERENCE_GROUP_MEMBERS) + val targetCol = reader.resolve(INTERFERENCE_GROUP_TARGET) + val scoreCol = reader.resolve(INTERFERENCE_GROUP_SCORE) + + val modelBuilder = VmInterferenceModel.builder() + + while (reader.nextRow()) { + @Suppress("UNCHECKED_CAST") + val members = reader.get(membersCol) as Set<String> + val target = reader.getDouble(targetCol) + val score = reader.getDouble(scoreCol) + + modelBuilder + .addGroup(members, target, score) + } + + modelBuilder.build() + } finally { + reader.close() + } + } + + /** * Load the trace with the specified [name] and [format]. */ - public fun get(name: String, format: String): List<VirtualMachine> { - return cache.computeIfAbsent(name) { - val path = baseDir.resolve(it) + public fun get(name: String, format: String): ComputeWorkload.Resolved { + val ref = cache.compute(name) { key, oldVal -> + val inst = oldVal?.get() + if (inst == null) { - logger.info { "Loading trace $it at $path" } + val path = baseDir.resolve(key) - val trace = Trace.open(path, format) - val fragments = parseFragments(trace) - parseMeta(trace, fragments) + logger.info { "Loading trace $key at $path" } + + val trace = Trace.open(path, format) + val fragments = parseFragments(trace) + val vms = parseMeta(trace, fragments) + val interferenceModel = parseInterferenceModel(trace) + val instance = ComputeWorkload.Resolved(vms, interferenceModel) + + SoftReference(instance) + } else { + oldVal + } } + + return checkNotNull(ref?.get()) { "Memory pressure" } } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt index 9b2bec55..1959c48d 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt @@ -37,17 +37,17 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } - val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } + val totalLoad = traces.sumOf { (_, w) -> w.vms.sumOf { it.totalLoad } } val res = mutableListOf<VirtualMachine>() - for ((fraction, vms) in traces) { + for ((fraction, w) in traces) { var currentLoad = 0.0 - for (entry in vms) { + for (entry in w.vms) { val entryLoad = entry.totalLoad if ((currentLoad + entryLoad) / totalLoad > fraction) { break @@ -58,9 +58,9 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double } } - val vmCount = traces.sumOf { (_, vms) -> vms.size } + val vmCount = traces.sumOf { (_, w) -> w.vms.size } logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" } - return res + return ComputeWorkload.Resolved(res, null) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt index 52f4c672..84a77f0f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt @@ -45,8 +45,8 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti */ private val pattern = Regex("^(ComputeNode|cn).*") - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { - val vms = source.resolve(loader, random) + override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { + val (vms, interferenceModel) = source.resolve(loader, random) val (hpc, nonHpc) = vms.partition { entry -> val name = entry.name @@ -130,7 +130,7 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - return res + return ComputeWorkload.Resolved(res, interferenceModel) } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt index ef6de729..bc13560c 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt @@ -37,8 +37,8 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { - val vms = source.resolve(loader, random) + override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { + val (vms, interferenceModel) = source.resolve(loader, random) val res = mutableListOf<VirtualMachine>() val totalLoad = vms.sumOf { it.totalLoad } @@ -56,6 +56,6 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - return res + return ComputeWorkload.Resolved(res, interferenceModel) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt index c20cb8f3..dc9abaef 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -24,14 +24,13 @@ package org.opendc.compute.workload.internal import org.opendc.compute.workload.ComputeWorkload import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine import java.util.* /** * A [ComputeWorkload] from a trace. */ internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { return loader.get(name, format) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt deleted file mode 100644 index e0fa8904..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.util - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.core.JsonParseException -import com.fasterxml.jackson.core.JsonParser -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import java.io.File -import java.io.InputStream - -/** - * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. - */ -public class VmInterferenceModelReader { - /** - * The [ObjectMapper] to use. - */ - private val mapper = jacksonObjectMapper() - - /** - * Read the performance interface model from [file]. - */ - public fun read(file: File): VmInterferenceModel { - val builder = VmInterferenceModel.builder() - val parser = mapper.createParser(file) - parseGroups(parser, builder) - return builder.build() - } - - /** - * Read the performance interface model from the input. - */ - public fun read(input: InputStream): VmInterferenceModel { - val builder = VmInterferenceModel.builder() - val parser = mapper.createParser(input) - parseGroups(parser, builder) - return builder.build() - } - - /** - * Parse all groups in an interference JSON file. - */ - private fun parseGroups(parser: JsonParser, builder: VmInterferenceModel.Builder) { - parser.nextToken() - - if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}") - } - - while (parser.nextToken() != JsonToken.END_ARRAY) { - parseGroup(parser, builder) - } - } - - /** - * Parse a group an interference JSON file. - */ - private fun parseGroup(parser: JsonParser, builder: VmInterferenceModel.Builder) { - var targetLoad = Double.POSITIVE_INFINITY - var score = 1.0 - val members = mutableSetOf<String>() - - if (!parser.isExpectedStartObjectToken) { - throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}") - } - - while (parser.nextValue() != JsonToken.END_OBJECT) { - when (parser.currentName) { - "vms" -> parseGroupMembers(parser, members) - "minServerLoad" -> targetLoad = parser.doubleValue - "performanceScore" -> score = parser.doubleValue - } - } - - builder.addGroup(members, targetLoad, score) - } - - /** - * Parse the members of a group. - */ - private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) { - if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") - } - - while (parser.nextValue() != JsonToken.END_ARRAY) { - if (parser.currentToken() != JsonToken.VALUE_STRING) { - throw JsonParseException(parser, "Expected string value for group member") - } - - val member = parser.text.removePrefix("vm__workload__").removeSuffix(".txt") - members.add(member) - } - } - - private data class Group( - @JsonProperty("minServerLoad") - val targetLoad: Double, - @JsonProperty("performanceScore") - val score: Double, - @JsonProperty("vms") - val members: Set<String>, - ) -} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt deleted file mode 100644 index 1c3e7149..00000000 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.util - -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow - -/** - * Test suite for the [VmInterferenceModelReader] class. - */ -class VmInterferenceModelReaderTest { - @Test - fun testSmoke() { - val input = checkNotNull(VmInterferenceModelReader::class.java.getResourceAsStream("/perf-interference.json")) - assertDoesNotThrow { VmInterferenceModelReader().read(input) } - } -} diff --git a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json deleted file mode 100644 index 1be5852b..00000000 --- a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json +++ /dev/null @@ -1,22 +0,0 @@ -[ - { - "vms": [ - "vm_a", - "vm_c", - "vm_x", - "vm_y" - ], - "minServerLoad": 0.0, - "performanceScore": 0.8830158730158756 - }, - { - "vms": [ - "vm_a", - "vm_b", - "vm_c", - "vm_d" - ], - "minServerLoad": 0.0, - "performanceScore": 0.7133055555552751 - } -] |
