diff options
54 files changed, 542 insertions, 247 deletions
diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index 93e09b99..41a4b52d 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -40,7 +40,4 @@ dependencies { implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) - implementation(libs.jackson.databind) - implementation(libs.jackson.module.kotlin) - implementation(kotlin("reflect")) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt index 78002c2f..aa0b5eaf 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt @@ -22,6 +22,7 @@ package org.opendc.compute.workload +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import java.util.* /** @@ -31,5 +32,10 @@ public interface ComputeWorkload { /** * Resolve the workload into a list of [VirtualMachine]s to simulate. */ - public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> + public fun resolve(loader: ComputeWorkloadLoader, random: Random): Resolved + + /** + * A concrete instance of a workload. + */ + public data class Resolved(val vms: List<VirtualMachine>, val interferenceModel: VmInterferenceModel?) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 36a76f68..720c7e58 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -23,9 +23,12 @@ package org.opendc.compute.workload import mu.KotlinLogging +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimTrace import org.opendc.trace.* +import org.opendc.trace.conv.* import java.io.File +import java.lang.ref.SoftReference import java.time.Duration import java.time.Instant import java.util.* @@ -47,7 +50,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * The cache of workloads. */ - private val cache = ConcurrentHashMap<String, List<VirtualMachine>>() + private val cache = ConcurrentHashMap<String, SoftReference<ComputeWorkload.Resolved>>() /** * Read the fragments into memory. @@ -145,18 +148,59 @@ public class ComputeWorkloadLoader(private val baseDir: File) { } /** + * Read the interference model associated with the specified [trace]. + */ + private fun parseInterferenceModel(trace: Trace): VmInterferenceModel { + val reader = checkNotNull(trace.getTable(TABLE_INTERFERENCE_GROUPS)).newReader() + + return try { + val membersCol = reader.resolve(INTERFERENCE_GROUP_MEMBERS) + val targetCol = reader.resolve(INTERFERENCE_GROUP_TARGET) + val scoreCol = reader.resolve(INTERFERENCE_GROUP_SCORE) + + val modelBuilder = VmInterferenceModel.builder() + + while (reader.nextRow()) { + @Suppress("UNCHECKED_CAST") + val members = reader.get(membersCol) as Set<String> + val target = reader.getDouble(targetCol) + val score = reader.getDouble(scoreCol) + + modelBuilder + .addGroup(members, target, score) + } + + modelBuilder.build() + } finally { + reader.close() + } + } + + /** * Load the trace with the specified [name] and [format]. */ - public fun get(name: String, format: String): List<VirtualMachine> { - return cache.computeIfAbsent(name) { - val path = baseDir.resolve(it) + public fun get(name: String, format: String): ComputeWorkload.Resolved { + val ref = cache.compute(name) { key, oldVal -> + val inst = oldVal?.get() + if (inst == null) { - logger.info { "Loading trace $it at $path" } + val path = baseDir.resolve(key) - val trace = Trace.open(path, format) - val fragments = parseFragments(trace) - parseMeta(trace, fragments) + logger.info { "Loading trace $key at $path" } + + val trace = Trace.open(path, format) + val fragments = parseFragments(trace) + val vms = parseMeta(trace, fragments) + val interferenceModel = parseInterferenceModel(trace) + val instance = ComputeWorkload.Resolved(vms, interferenceModel) + + SoftReference(instance) + } else { + oldVal + } } + + return checkNotNull(ref?.get()) { "Memory pressure" } } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt index 9b2bec55..1959c48d 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt @@ -37,17 +37,17 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } - val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } + val totalLoad = traces.sumOf { (_, w) -> w.vms.sumOf { it.totalLoad } } val res = mutableListOf<VirtualMachine>() - for ((fraction, vms) in traces) { + for ((fraction, w) in traces) { var currentLoad = 0.0 - for (entry in vms) { + for (entry in w.vms) { val entryLoad = entry.totalLoad if ((currentLoad + entryLoad) / totalLoad > fraction) { break @@ -58,9 +58,9 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double } } - val vmCount = traces.sumOf { (_, vms) -> vms.size } + val vmCount = traces.sumOf { (_, w) -> w.vms.size } logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" } - return res + return ComputeWorkload.Resolved(res, null) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt index 52f4c672..84a77f0f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt @@ -45,8 +45,8 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti */ private val pattern = Regex("^(ComputeNode|cn).*") - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { - val vms = source.resolve(loader, random) + override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { + val (vms, interferenceModel) = source.resolve(loader, random) val (hpc, nonHpc) = vms.partition { entry -> val name = entry.name @@ -130,7 +130,7 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - return res + return ComputeWorkload.Resolved(res, interferenceModel) } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt index ef6de729..bc13560c 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt @@ -37,8 +37,8 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { - val vms = source.resolve(loader, random) + override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { + val (vms, interferenceModel) = source.resolve(loader, random) val res = mutableListOf<VirtualMachine>() val totalLoad = vms.sumOf { it.totalLoad } @@ -56,6 +56,6 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - return res + return ComputeWorkload.Resolved(res, interferenceModel) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt index c20cb8f3..dc9abaef 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -24,14 +24,13 @@ package org.opendc.compute.workload.internal import org.opendc.compute.workload.ComputeWorkload import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine import java.util.* /** * A [ComputeWorkload] from a trace. */ internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { return loader.get(name, format) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt deleted file mode 100644 index e0fa8904..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.util - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.core.JsonParseException -import com.fasterxml.jackson.core.JsonParser -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import java.io.File -import java.io.InputStream - -/** - * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. - */ -public class VmInterferenceModelReader { - /** - * The [ObjectMapper] to use. - */ - private val mapper = jacksonObjectMapper() - - /** - * Read the performance interface model from [file]. - */ - public fun read(file: File): VmInterferenceModel { - val builder = VmInterferenceModel.builder() - val parser = mapper.createParser(file) - parseGroups(parser, builder) - return builder.build() - } - - /** - * Read the performance interface model from the input. - */ - public fun read(input: InputStream): VmInterferenceModel { - val builder = VmInterferenceModel.builder() - val parser = mapper.createParser(input) - parseGroups(parser, builder) - return builder.build() - } - - /** - * Parse all groups in an interference JSON file. - */ - private fun parseGroups(parser: JsonParser, builder: VmInterferenceModel.Builder) { - parser.nextToken() - - if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}") - } - - while (parser.nextToken() != JsonToken.END_ARRAY) { - parseGroup(parser, builder) - } - } - - /** - * Parse a group an interference JSON file. - */ - private fun parseGroup(parser: JsonParser, builder: VmInterferenceModel.Builder) { - var targetLoad = Double.POSITIVE_INFINITY - var score = 1.0 - val members = mutableSetOf<String>() - - if (!parser.isExpectedStartObjectToken) { - throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}") - } - - while (parser.nextValue() != JsonToken.END_OBJECT) { - when (parser.currentName) { - "vms" -> parseGroupMembers(parser, members) - "minServerLoad" -> targetLoad = parser.doubleValue - "performanceScore" -> score = parser.doubleValue - } - } - - builder.addGroup(members, targetLoad, score) - } - - /** - * Parse the members of a group. - */ - private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) { - if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") - } - - while (parser.nextValue() != JsonToken.END_ARRAY) { - if (parser.currentToken() != JsonToken.VALUE_STRING) { - throw JsonParseException(parser, "Expected string value for group member") - } - - val member = parser.text.removePrefix("vm__workload__").removeSuffix(".txt") - members.add(member) - } - } - - private data class Group( - @JsonProperty("minServerLoad") - val targetLoad: Double, - @JsonProperty("performanceScore") - val score: Double, - @JsonProperty("vms") - val members: Set<String>, - ) -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index bb9cb201..83b8c0c6 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -58,7 +58,7 @@ class CapelinBenchmarks { fun setUp() { val loader = ComputeWorkloadLoader(File("src/test/resources/trace")) val source = trace("bitbrains-small") - vms = source.resolve(loader, Random(1L)) + vms = source.resolve(loader, Random(1L)).vms topology = checkNotNull(object {}.javaClass.getResourceAsStream("/env/topology.txt")).use { clusterTopology(it) } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 6604a190..0bbf1443 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -23,7 +23,6 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory -import mu.KotlinLogging import org.opendc.compute.workload.ComputeServiceHelper import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.createComputeScheduler @@ -31,7 +30,6 @@ import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter import org.opendc.compute.workload.grid5000 import org.opendc.compute.workload.telemetry.SdkTelemetryManager import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -52,11 +50,6 @@ import kotlin.math.roundToLong */ abstract class Portfolio(name: String) : Experiment(name) { /** - * The logger for this portfolio instance. - */ - private val logger = KotlinLogging.logger {} - - /** * The configuration to use. */ private val config = ConfigFactory.load().getConfig("opendc.experiments.capelin") @@ -97,18 +90,13 @@ abstract class Portfolio(name: String) : Experiment(name) { override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) - val performanceInterferenceModel = if (operationalPhenomena.hasInterference) - VmInterferenceModelReader() - .read(File(config.getString("interference-model"))) - else - null - val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements) val failureModel = if (operationalPhenomena.failureFrequency > 0) grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong())) else null + val (vms, interferenceModel) = workload.source.resolve(workloadLoader, seeder) val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, @@ -116,7 +104,7 @@ abstract class Portfolio(name: String) : Experiment(name) { telemetry, computeScheduler, failureModel, - performanceInterferenceModel?.withSeed(repeat.toLong()) + interferenceModel?.withSeed(repeat.toLong()) ) val exporter = ParquetComputeMetricExporter( @@ -132,8 +120,8 @@ abstract class Portfolio(name: String) : Experiment(name) { // Instantiate the desired topology runner.apply(topology) - // Converge the workload trace - runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) + // Run the workload trace + runner.run(vms, seeder.nextLong()) } finally { runner.close() } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 891fc8be..01b2a8fe 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -35,7 +35,6 @@ import org.opendc.compute.workload.* import org.opendc.compute.workload.telemetry.SdkTelemetryManager import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter @@ -85,7 +84,7 @@ class CapelinIntegrationTest { */ @Test fun testLarge() = runBlockingSimulation { - val workload = createTestWorkload(1.0) + val (workload, _) = createTestWorkload(1.0) val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, @@ -135,7 +134,7 @@ class CapelinIntegrationTest { @Test fun testSmall() = runBlockingSimulation { val seed = 1 - val workload = createTestWorkload(0.25, seed) + val (workload, _) = createTestWorkload(0.25, seed) val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, @@ -180,12 +179,7 @@ class CapelinIntegrationTest { @Test fun testInterference() = runBlockingSimulation { val seed = 0 - val workload = createTestWorkload(1.0, seed) - val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json")) - val performanceInterferenceModel = - VmInterferenceModelReader() - .read(perfInterferenceInput) - .withSeed(seed.toLong()) + val (workload, interferenceModel) = createTestWorkload(1.0, seed) val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( @@ -193,7 +187,7 @@ class CapelinIntegrationTest { clock, telemetry, computeScheduler, - interferenceModel = performanceInterferenceModel + interferenceModel = interferenceModel?.withSeed(seed.toLong()) ) val topology = createTopology("single") @@ -240,7 +234,7 @@ class CapelinIntegrationTest { grid5000(Duration.ofDays(7)) ) val topology = createTopology("single") - val workload = createTestWorkload(0.25, seed) + val (workload, _) = createTestWorkload(0.25, seed) telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) @@ -274,7 +268,7 @@ class CapelinIntegrationTest { /** * Obtain the trace reader for the test. */ - private fun createTestWorkload(fraction: Double, seed: Int = 0): List<VirtualMachine> { + private fun createTestWorkload(fraction: Double, seed: Int = 0): ComputeWorkload.Resolved { val source = trace("bitbrains-small").sampleByLoad(fraction) return source.resolve(workloadLoader, Random(seed.toLong())) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/interference-model.json index 51fc6366..51fc6366 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/interference-model.json diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt index 1c3e7149..532f6d24 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,18 +20,26 @@ * SOFTWARE. */ -package org.opendc.compute.workload.util +@file:JvmName("InterferenceGroupColumns") +package org.opendc.trace.conv -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.trace.TableColumn +import org.opendc.trace.column /** - * Test suite for the [VmInterferenceModelReader] class. + * Members of the interference group. */ -class VmInterferenceModelReaderTest { - @Test - fun testSmoke() { - val input = checkNotNull(VmInterferenceModelReader::class.java.getResourceAsStream("/perf-interference.json")) - assertDoesNotThrow { VmInterferenceModelReader().read(input) } - } -} +@JvmField +public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("interference_group:members") + +/** + * Target load after which the interference occurs. + */ +@JvmField +public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("interference_group:target") + +/** + * Performance score when the interference occurs. + */ +@JvmField +public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("interference_group:score") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt index f1977945..e9fc5d44 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -21,8 +21,10 @@ */ @file:JvmName("ResourceColumns") -package org.opendc.trace +package org.opendc.trace.conv +import org.opendc.trace.TableColumn +import org.opendc.trace.column import java.time.Instant /** diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt index 244352ae..d5bbafd7 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -21,8 +21,10 @@ */ @file:JvmName("ResourceStateColumns") -package org.opendc.trace +package org.opendc.trace.conv +import org.opendc.trace.TableColumn +import org.opendc.trace.column import java.time.Duration import java.time.Instant diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt index 31a58360..31a58360 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt index bb9d93e2..669ebe58 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -21,7 +21,7 @@ */ @file:JvmName("Tables") -package org.opendc.trace +package org.opendc.trace.conv /** * A table containing all workflows in a workload. @@ -42,3 +42,8 @@ public const val TABLE_RESOURCES: String = "resources" * A table containing all resource states in a workload. */ public const val TABLE_RESOURCE_STATES: String = "resource_states" + +/** + * A table containing the groups of resources that interfere when run on the same execution platform. + */ +public const val TABLE_INTERFERENCE_GROUPS: String = "interference_groups" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt index d103bce4..397c0794 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -21,8 +21,10 @@ */ @file:JvmName("TaskColumns") -package org.opendc.trace +package org.opendc.trace.conv +import org.opendc.trace.TableColumn +import org.opendc.trace.column import java.time.Duration import java.time.Instant diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt index 94a91999..3132b1d9 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt @@ -26,6 +26,9 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import org.opendc.trace.conv.RESOURCE_ID +import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT +import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP import java.time.Instant /** diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt index 6246dc35..154a37e4 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt @@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import org.opendc.trace.conv.* import java.time.Instant /** diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt index c9982877..8e3e60cc 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt @@ -25,6 +25,7 @@ package org.opendc.trace.azure import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser import org.opendc.trace.* +import org.opendc.trace.conv.* import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.CompositeTableReader diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt index eda0b214..56f9a940 100644 --- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -25,7 +25,7 @@ package org.opendc.trace.azure import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.trace.* +import org.opendc.trace.conv.* import java.nio.file.Paths /** diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt index c1b6f5ba..1e1d1a09 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt @@ -23,6 +23,7 @@ package org.opendc.trace.bitbrains import org.opendc.trace.* +import org.opendc.trace.conv.* import java.io.BufferedReader import java.time.Instant diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt index 20222c8a..11d21a04 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt @@ -23,6 +23,7 @@ package org.opendc.trace.bitbrains import org.opendc.trace.* +import org.opendc.trace.conv.* import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.CompositeTableReader diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt index 3a8839b4..214fd749 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import org.opendc.trace.conv.* import java.text.NumberFormat import java.time.Instant import java.time.LocalDateTime diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt index 3701994a..55f09f43 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt @@ -24,6 +24,7 @@ package org.opendc.trace.bitbrains import com.fasterxml.jackson.dataformat.csv.CsvFactory import org.opendc.trace.* +import org.opendc.trace.conv.RESOURCE_ID import java.nio.file.Path /** diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt index 3885c931..e1e7604a 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -25,6 +25,7 @@ package org.opendc.trace.bitbrains import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser import org.opendc.trace.* +import org.opendc.trace.conv.* import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.CompositeTableReader diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt index d734cf5f..77429e3e 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt @@ -25,7 +25,9 @@ package org.opendc.trace.bitbrains import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.trace.* +import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE +import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP +import org.opendc.trace.conv.TABLE_RESOURCE_STATES import java.nio.file.Paths /** diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt index 41e7def2..9309beb1 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -25,7 +25,7 @@ package org.opendc.trace.bitbrains import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.trace.* +import org.opendc.trace.conv.* import java.nio.file.Paths /** diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 7f01ef2b..42a9469e 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import org.opendc.trace.conv.* import java.time.Duration import java.time.Instant import java.util.regex.Pattern diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index d4287420..63688523 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -25,6 +25,7 @@ package org.opendc.trace.gwf import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser import org.opendc.trace.* +import org.opendc.trace.conv.* import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import java.nio.file.Path diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index 5dfd02a1..9bf28ad7 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -24,7 +24,7 @@ package org.opendc.trace.gwf import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* -import org.opendc.trace.* +import org.opendc.trace.conv.* import java.nio.file.Paths import java.time.Duration import java.time.Instant diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt new file mode 100644 index 00000000..eb91e305 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.opendc.trace.* +import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS +import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE +import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET +import shaded.parquet.com.fasterxml.jackson.core.JsonParseException +import shaded.parquet.com.fasterxml.jackson.core.JsonParser +import shaded.parquet.com.fasterxml.jackson.core.JsonToken + +/** + * A [TableReader] implementation for the OpenDC VM interference JSON format. + */ +internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) : TableReader { + /** + * A flag to indicate whether a single row has been read already. + */ + private var isStarted = false + + override fun nextRow(): Boolean { + if (!isStarted) { + isStarted = true + + parser.nextToken() + + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}") + } + } + + return if (parser.nextToken() != JsonToken.END_ARRAY) { + parseGroup(parser) + true + } else { + reset() + false + } + } + + override fun resolve(column: TableColumn<*>): Int { + return when (column) { + INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS + INTERFERENCE_GROUP_TARGET -> COL_TARGET + INTERFERENCE_GROUP_SCORE -> COL_SCORE + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + return when (index) { + COL_MEMBERS, COL_TARGET, COL_SCORE -> false + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun get(index: Int): Any { + return when (index) { + COL_MEMBERS -> members + COL_TARGET -> targetLoad + COL_SCORE -> score + else -> throw IllegalArgumentException("Invalid column $index") + } + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getInt(index: Int): Int { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getDouble(index: Int): Double { + return when (index) { + COL_TARGET -> targetLoad + COL_SCORE -> score + else -> throw IllegalArgumentException("Invalid column $index") + } + } + + override fun close() { + parser.close() + } + + private val COL_MEMBERS = 0 + private val COL_TARGET = 1 + private val COL_SCORE = 2 + + private var members = emptySet<String>() + private var targetLoad = Double.POSITIVE_INFINITY + private var score = 1.0 + + /** + * Reset the state. + */ + private fun reset() { + members = emptySet() + targetLoad = Double.POSITIVE_INFINITY + score = 1.0 + } + + /** + * Parse a group an interference JSON file. + */ + private fun parseGroup(parser: JsonParser) { + var targetLoad = Double.POSITIVE_INFINITY + var score = 1.0 + val members = mutableSetOf<String>() + + if (!parser.isExpectedStartObjectToken) { + throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "vms" -> parseGroupMembers(parser, members) + "minServerLoad" -> targetLoad = parser.doubleValue + "performanceScore" -> score = parser.doubleValue + } + } + + this.members = members + this.targetLoad = targetLoad + this.score = score + } + + /** + * Parse the members of a group. + */ + private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) { + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_ARRAY) { + if (parser.currentToken() != JsonToken.VALUE_STRING) { + throw JsonParseException(parser, "Expected string value for group member") + } + + members.add(parser.text) + } + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt new file mode 100644 index 00000000..64bc4356 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.opendc.trace.* +import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS +import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE +import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET +import shaded.parquet.com.fasterxml.jackson.core.JsonGenerator + +/** + * A [TableWriter] implementation for the OpenDC VM interference JSON format. + */ +internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGenerator) : TableWriter { + /** + * A flag to indicate whether a row has been started. + */ + private var isRowActive = false + + init { + generator.writeStartArray() + } + + override fun startRow() { + // Reset state + members = emptySet() + targetLoad = Double.POSITIVE_INFINITY + score = 1.0 + + // Mark row as active + isRowActive = true + } + + override fun endRow() { + check(isRowActive) { "No active row" } + + generator.writeStartObject() + generator.writeArrayFieldStart("vms") + for (member in members) { + generator.writeString(member) + } + generator.writeEndArray() + generator.writeNumberField("minServerLoad", targetLoad) + generator.writeNumberField("performanceScore", score) + generator.writeEndObject() + } + + override fun resolve(column: TableColumn<*>): Int { + return when (column) { + INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS + INTERFERENCE_GROUP_TARGET -> COL_TARGET + INTERFERENCE_GROUP_SCORE -> COL_SCORE + else -> -1 + } + } + + override fun set(index: Int, value: Any) { + check(isRowActive) { "No active row" } + + @Suppress("UNCHECKED_CAST") + when (index) { + COL_MEMBERS -> members = value as Set<String> + COL_TARGET -> targetLoad = (value as Number).toDouble() + COL_SCORE -> score = (value as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setBoolean(index: Int, value: Boolean) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setInt(index: Int, value: Int) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setLong(index: Int, value: Long) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setDouble(index: Int, value: Double) { + check(isRowActive) { "No active row" } + + when (index) { + COL_TARGET -> targetLoad = (value as Number).toDouble() + COL_SCORE -> score = (value as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column $index") + } + } + + override fun flush() { + generator.flush() + } + + override fun close() { + generator.writeEndArray() + generator.close() + } + + private val COL_MEMBERS = 0 + private val COL_TARGET = 1 + private val COL_SCORE = 2 + + private var members = emptySet<String>() + private var targetLoad = Double.POSITIVE_INFINITY + private var score = 1.0 +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index b5043f82..b82da888 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -25,6 +25,7 @@ package org.opendc.trace.opendc import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.opendc.trace.* +import org.opendc.trace.conv.* import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Duration import java.time.Instant diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt index 15a8cb85..01b9750c 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt @@ -27,6 +27,7 @@ import org.apache.avro.generic.GenericRecord import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.* +import org.opendc.trace.conv.* import java.time.Duration import java.time.Instant diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index ffbdc440..4909e70e 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -25,6 +25,7 @@ package org.opendc.trace.opendc import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.opendc.trace.* +import org.opendc.trace.conv.* import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Instant diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt index 4b66a86f..edc89ee6 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt @@ -27,6 +27,7 @@ import org.apache.avro.generic.GenericRecord import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.* +import org.opendc.trace.conv.* import java.time.Instant import kotlin.math.roundToLong diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 886f3d54..36a1b4a0 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -29,19 +29,28 @@ import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetFileWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* +import org.opendc.trace.conv.* import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding +import shaded.parquet.com.fasterxml.jackson.core.JsonFactory import java.nio.file.Files import java.nio.file.Path +import kotlin.io.path.exists /** * A [TraceFormat] implementation of the OpenDC virtual machine trace format. */ public class OdcVmTraceFormat : TraceFormat { /** + * A [JsonFactory] that is used to parse the JSON-based interference model. + */ + private val jsonFactory = JsonFactory() + + /** * The name of this trace format. */ override val name: String = "opendc-vm" @@ -58,7 +67,7 @@ public class OdcVmTraceFormat : TraceFormat { } } - override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS) override fun getDetails(path: Path, table: String): TableDetails { return when (table) { @@ -82,6 +91,13 @@ public class OdcVmTraceFormat : TraceFormat { ), listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) ) + TABLE_INTERFERENCE_GROUPS -> TableDetails( + listOf( + INTERFERENCE_GROUP_MEMBERS, + INTERFERENCE_GROUP_TARGET, + INTERFERENCE_GROUP_SCORE, + ) + ) else -> throw IllegalArgumentException("Table $table not supported") } } @@ -96,6 +112,15 @@ public class OdcVmTraceFormat : TraceFormat { val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) OdcVmResourceStateTableReader(reader) } + TABLE_INTERFERENCE_GROUPS -> { + val modelPath = path.resolve("interference-model.json") + val parser = if (modelPath.exists()) + jsonFactory.createParser(modelPath.toFile()) + else + jsonFactory.createParser("[]") // If model does not exist, return empty model + + OdcVmInterferenceJsonTableReader(parser) + } else -> throw IllegalArgumentException("Table $table not supported") } } @@ -122,6 +147,10 @@ public class OdcVmTraceFormat : TraceFormat { .build() OdcVmResourceStateTableWriter(writer, schema) } + TABLE_INTERFERENCE_GROUPS -> { + val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8) + OdcVmInterferenceJsonTableWriter(generator) + } else -> throw IllegalArgumentException("Table $table not supported") } } diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index bfe0f881..c8742624 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -28,7 +28,7 @@ import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import org.opendc.trace.* +import org.opendc.trace.conv.* import java.nio.file.Paths /** @@ -41,7 +41,7 @@ internal class OdcVmTraceFormatTest { fun testTables() { val path = Paths.get("src/test/resources/trace-v2.1") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS), format.getTables(path)) } @Test @@ -93,4 +93,33 @@ internal class OdcVmTraceFormatTest { reader.close() } + + @Test + fun testInterferenceGroups() { + val path = Paths.get("src/test/resources/trace-v2.1") + val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals(setOf("1019", "1023", "1052"), reader.get(INTERFERENCE_GROUP_MEMBERS)) }, + { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) }, + { assertEquals(0.8830158730158756, reader.get(INTERFERENCE_GROUP_SCORE)) }, + { assertTrue(reader.nextRow()) }, + { assertEquals(setOf("1023", "1052", "1073"), reader.get(INTERFERENCE_GROUP_MEMBERS)) }, + { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) }, + { assertEquals(0.7133055555552751, reader.get(INTERFERENCE_GROUP_SCORE)) }, + { assertFalse(reader.nextRow()) } + ) + + reader.close() + } + + @Test + fun testInterferenceGroupsEmpty() { + val path = Paths.get("src/test/resources/trace-v2.0") + val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + + assertFalse(reader.nextRow()) + reader.close() + } } diff --git a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/interference-model.json index 1be5852b..6a0616d9 100644 --- a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/interference-model.json @@ -1,20 +1,18 @@ [ { "vms": [ - "vm_a", - "vm_c", - "vm_x", - "vm_y" + "1019", + "1023", + "1052" ], "minServerLoad": 0.0, "performanceScore": 0.8830158730158756 }, { "vms": [ - "vm_a", - "vm_b", - "vm_c", - "vm_d" + "1023", + "1052", + "1073" ], "minServerLoad": 0.0, "performanceScore": 0.7133055555552751 diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt index 2f6ea6ee..40b604c3 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt @@ -23,6 +23,7 @@ package org.opendc.trace.swf import org.opendc.trace.* +import org.opendc.trace.conv.* import java.io.BufferedReader import java.time.Duration import java.time.Instant diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index 1fd076d5..b969f3ef 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -23,6 +23,7 @@ package org.opendc.trace.swf import org.opendc.trace.* +import org.opendc.trace.conv.* import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import java.nio.file.Path diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 4dcd43f6..1698f644 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -24,9 +24,9 @@ package org.opendc.trace.swf import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* -import org.opendc.trace.TABLE_TASKS -import org.opendc.trace.TASK_ALLOC_NCPUS -import org.opendc.trace.TASK_ID +import org.opendc.trace.conv.TABLE_TASKS +import org.opendc.trace.conv.TASK_ALLOC_NCPUS +import org.opendc.trace.conv.TASK_ID import java.nio.file.Paths /** diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index 5088b044..c71035d4 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -33,6 +33,7 @@ import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.* import mu.KotlinLogging import org.opendc.trace.* +import org.opendc.trace.conv.* import java.io.File import java.time.Duration import java.time.Instant diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt index 7f378d80..d8eafa9c 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt @@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.core.JsonToken import org.opendc.trace.* +import org.opendc.trace.conv.* import java.time.Duration import kotlin.math.roundToInt diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt index c75e3cbb..bc175b58 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -24,6 +24,7 @@ package org.opendc.trace.wfformat import com.fasterxml.jackson.core.JsonFactory import org.opendc.trace.* +import org.opendc.trace.conv.* import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import java.nio.file.Path diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt index b07f27ed..e27bc82c 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt @@ -28,8 +28,8 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows -import org.opendc.trace.TASK_ID -import org.opendc.trace.TASK_PARENTS +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_PARENTS /** * Test suite for the [WfFormatTaskTableReader] class. diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt index 217b175d..710de88e 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -27,7 +27,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows -import org.opendc.trace.* +import org.opendc.trace.conv.* import java.nio.file.Paths /** diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index 45ec25dd..1e332aca 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -25,6 +25,7 @@ package org.opendc.trace.wtf import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.opendc.trace.* +import org.opendc.trace.conv.* import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Duration import java.time.Instant diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index ef88d295..c8f9ecaa 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -24,6 +24,7 @@ package org.opendc.trace.wtf import org.apache.avro.generic.GenericRecord import org.opendc.trace.* +import org.opendc.trace.conv.* import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.parquet.LocalParquetReader diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index 09c3703a..0f0e422d 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -25,7 +25,7 @@ package org.opendc.trace.wtf import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.trace.* +import org.opendc.trace.conv.* import java.nio.file.Paths import java.time.Duration import java.time.Instant @@ -69,7 +69,14 @@ class WtfTraceFormatTest { { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) }, { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) }, - { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) }, + { + assertEquals( + setOf("584055316413447529", "133113685133695608", "1008582348422865408"), + reader.get( + TASK_PARENTS + ) + ) + }, ) assertAll( @@ -78,7 +85,14 @@ class WtfTraceFormatTest { { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) }, { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) }, - { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) }, + { + assertEquals( + setOf("584055316413447529", "133113685133695608", "1008582348422865408"), + reader.get( + TASK_PARENTS + ) + ) + }, ) reader.close() diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index bd770574..a150de4e 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -28,8 +28,6 @@ import org.opendc.compute.workload.telemetry.SdkTelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.VmInterferenceModelReader -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -145,21 +143,8 @@ public class OpenDCRunner( val heartbeat = scheduler.scheduleWithFixedDelay({ manager.heartbeat(id) }, 0, heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS) try { - logger.debug { "Constructing performance interference model" } - - val interferenceModel = let { - val path = tracePath.resolve(scenario.workload.trace.id).resolve("performance-interference-model.json") - val enabled = scenario.phenomena.interference - - if (!enabled || !path.exists()) { - return@let null - } - - VmInterferenceModelReader().read(path.inputStream()) - } - val topology = convertTopology(scenario.topology) - val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology, interferenceModel) } + val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology) } val results = invokeAll(jobs) logger.info { "Finished simulation for job $id" } @@ -190,13 +175,11 @@ public class OpenDCRunner( * @param scenario The scenario to simulate. * @param repeat The repeat number used to seed the simulation. * @param topology The topology to simulate. - * @param interferenceModel The [VmInterferenceModel] used in this scenario. */ private inner class SimulationTask( private val scenario: Scenario, private val repeat: Int, private val topology: Topology, - private val interferenceModel: VmInterferenceModel? ) : RecursiveTask<WebComputeMetricExporter.Results>() { override fun compute(): WebComputeMetricExporter.Results { val exporter = WebComputeMetricExporter() @@ -215,6 +198,7 @@ public class OpenDCRunner( val phenomena = scenario.phenomena val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder) val workload = trace(workloadName).sampleByLoad(workloadFraction) + val (vms, interferenceModel) = workload.resolve(workloadLoader, seeder) val failureModel = if (phenomena.failures) @@ -229,7 +213,7 @@ public class OpenDCRunner( telemetry, computeScheduler, failureModel, - interferenceModel + interferenceModel.takeIf { phenomena.interference } ) telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1))) @@ -238,7 +222,7 @@ public class OpenDCRunner( // Instantiate the topology onto the simulator simulator.apply(topology) // Run workload trace - simulator.run(workload.resolve(workloadLoader, seeder), seeder.nextLong()) + simulator.run(vms, seeder.nextLong()) val serviceMetrics = collectServiceMetrics(telemetry.metricProducer) logger.debug { diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt index 73995d08..3aa4463c 100644 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt @@ -25,6 +25,7 @@ package org.opendc.workflow.workload import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.trace.* +import org.opendc.trace.conv.* import org.opendc.workflow.api.Job import org.opendc.workflow.api.Task import org.opendc.workflow.api.WORKFLOW_TASK_CORES |
