summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-workload/build.gradle.kts3
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt60
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt12
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt3
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt128
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt20
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt18
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/interference-model.json (renamed from opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json)0
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt (renamed from opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt)32
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt)6
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt)6
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt)0
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt)9
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt)6
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt3
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt1
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt1
-rw-r--r--opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt2
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt1
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt1
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt1
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt1
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt1
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt2
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt1
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt1
-rw-r--r--opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt2
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt169
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt127
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt1
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt1
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt1
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt1
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt31
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt33
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/interference-model.json (renamed from opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json)14
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt1
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt1
-rw-r--r--opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt6
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt1
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt1
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt1
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt4
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt2
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt1
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt1
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt20
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt24
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt1
54 files changed, 542 insertions, 247 deletions
diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts
index 93e09b99..41a4b52d 100644
--- a/opendc-compute/opendc-compute-workload/build.gradle.kts
+++ b/opendc-compute/opendc-compute-workload/build.gradle.kts
@@ -40,7 +40,4 @@ dependencies {
implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
- implementation(libs.jackson.databind)
- implementation(libs.jackson.module.kotlin)
- implementation(kotlin("reflect"))
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
index 78002c2f..aa0b5eaf 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
@@ -22,6 +22,7 @@
package org.opendc.compute.workload
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import java.util.*
/**
@@ -31,5 +32,10 @@ public interface ComputeWorkload {
/**
* Resolve the workload into a list of [VirtualMachine]s to simulate.
*/
- public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine>
+ public fun resolve(loader: ComputeWorkloadLoader, random: Random): Resolved
+
+ /**
+ * A concrete instance of a workload.
+ */
+ public data class Resolved(val vms: List<VirtualMachine>, val interferenceModel: VmInterferenceModel?)
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index 36a76f68..720c7e58 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -23,9 +23,12 @@
package org.opendc.compute.workload
import mu.KotlinLogging
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimTrace
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.io.File
+import java.lang.ref.SoftReference
import java.time.Duration
import java.time.Instant
import java.util.*
@@ -47,7 +50,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
/**
* The cache of workloads.
*/
- private val cache = ConcurrentHashMap<String, List<VirtualMachine>>()
+ private val cache = ConcurrentHashMap<String, SoftReference<ComputeWorkload.Resolved>>()
/**
* Read the fragments into memory.
@@ -145,18 +148,59 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
}
/**
+ * Read the interference model associated with the specified [trace].
+ */
+ private fun parseInterferenceModel(trace: Trace): VmInterferenceModel {
+ val reader = checkNotNull(trace.getTable(TABLE_INTERFERENCE_GROUPS)).newReader()
+
+ return try {
+ val membersCol = reader.resolve(INTERFERENCE_GROUP_MEMBERS)
+ val targetCol = reader.resolve(INTERFERENCE_GROUP_TARGET)
+ val scoreCol = reader.resolve(INTERFERENCE_GROUP_SCORE)
+
+ val modelBuilder = VmInterferenceModel.builder()
+
+ while (reader.nextRow()) {
+ @Suppress("UNCHECKED_CAST")
+ val members = reader.get(membersCol) as Set<String>
+ val target = reader.getDouble(targetCol)
+ val score = reader.getDouble(scoreCol)
+
+ modelBuilder
+ .addGroup(members, target, score)
+ }
+
+ modelBuilder.build()
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
* Load the trace with the specified [name] and [format].
*/
- public fun get(name: String, format: String): List<VirtualMachine> {
- return cache.computeIfAbsent(name) {
- val path = baseDir.resolve(it)
+ public fun get(name: String, format: String): ComputeWorkload.Resolved {
+ val ref = cache.compute(name) { key, oldVal ->
+ val inst = oldVal?.get()
+ if (inst == null) {
- logger.info { "Loading trace $it at $path" }
+ val path = baseDir.resolve(key)
- val trace = Trace.open(path, format)
- val fragments = parseFragments(trace)
- parseMeta(trace, fragments)
+ logger.info { "Loading trace $key at $path" }
+
+ val trace = Trace.open(path, format)
+ val fragments = parseFragments(trace)
+ val vms = parseMeta(trace, fragments)
+ val interferenceModel = parseInterferenceModel(trace)
+ val instance = ComputeWorkload.Resolved(vms, interferenceModel)
+
+ SoftReference(instance)
+ } else {
+ oldVal
+ }
}
+
+ return checkNotNull(ref?.get()) { "Memory pressure" }
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
index 9b2bec55..1959c48d 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
@@ -37,17 +37,17 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double
*/
private val logger = KotlinLogging.logger {}
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) }
- val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } }
+ val totalLoad = traces.sumOf { (_, w) -> w.vms.sumOf { it.totalLoad } }
val res = mutableListOf<VirtualMachine>()
- for ((fraction, vms) in traces) {
+ for ((fraction, w) in traces) {
var currentLoad = 0.0
- for (entry in vms) {
+ for (entry in w.vms) {
val entryLoad = entry.totalLoad
if ((currentLoad + entryLoad) / totalLoad > fraction) {
break
@@ -58,9 +58,9 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double
}
}
- val vmCount = traces.sumOf { (_, vms) -> vms.size }
+ val vmCount = traces.sumOf { (_, w) -> w.vms.size }
logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" }
- return res
+ return ComputeWorkload.Resolved(res, null)
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
index 52f4c672..84a77f0f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
@@ -45,8 +45,8 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti
*/
private val pattern = Regex("^(ComputeNode|cn).*")
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
- val vms = source.resolve(loader, random)
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
+ val (vms, interferenceModel) = source.resolve(loader, random)
val (hpc, nonHpc) = vms.partition { entry ->
val name = entry.name
@@ -130,7 +130,7 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti
logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
- return res
+ return ComputeWorkload.Resolved(res, interferenceModel)
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
index ef6de729..bc13560c 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
@@ -37,8 +37,8 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract
*/
private val logger = KotlinLogging.logger {}
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
- val vms = source.resolve(loader, random)
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
+ val (vms, interferenceModel) = source.resolve(loader, random)
val res = mutableListOf<VirtualMachine>()
val totalLoad = vms.sumOf { it.totalLoad }
@@ -56,6 +56,6 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract
logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
- return res
+ return ComputeWorkload.Resolved(res, interferenceModel)
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
index c20cb8f3..dc9abaef 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
@@ -24,14 +24,13 @@ package org.opendc.compute.workload.internal
import org.opendc.compute.workload.ComputeWorkload
import org.opendc.compute.workload.ComputeWorkloadLoader
-import org.opendc.compute.workload.VirtualMachine
import java.util.*
/**
* A [ComputeWorkload] from a trace.
*/
internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload {
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
return loader.get(name, format)
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt
deleted file mode 100644
index e0fa8904..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.util
-
-import com.fasterxml.jackson.annotation.JsonProperty
-import com.fasterxml.jackson.core.JsonParseException
-import com.fasterxml.jackson.core.JsonParser
-import com.fasterxml.jackson.core.JsonToken
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import java.io.File
-import java.io.InputStream
-
-/**
- * A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
- */
-public class VmInterferenceModelReader {
- /**
- * The [ObjectMapper] to use.
- */
- private val mapper = jacksonObjectMapper()
-
- /**
- * Read the performance interface model from [file].
- */
- public fun read(file: File): VmInterferenceModel {
- val builder = VmInterferenceModel.builder()
- val parser = mapper.createParser(file)
- parseGroups(parser, builder)
- return builder.build()
- }
-
- /**
- * Read the performance interface model from the input.
- */
- public fun read(input: InputStream): VmInterferenceModel {
- val builder = VmInterferenceModel.builder()
- val parser = mapper.createParser(input)
- parseGroups(parser, builder)
- return builder.build()
- }
-
- /**
- * Parse all groups in an interference JSON file.
- */
- private fun parseGroups(parser: JsonParser, builder: VmInterferenceModel.Builder) {
- parser.nextToken()
-
- if (!parser.isExpectedStartArrayToken) {
- throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}")
- }
-
- while (parser.nextToken() != JsonToken.END_ARRAY) {
- parseGroup(parser, builder)
- }
- }
-
- /**
- * Parse a group an interference JSON file.
- */
- private fun parseGroup(parser: JsonParser, builder: VmInterferenceModel.Builder) {
- var targetLoad = Double.POSITIVE_INFINITY
- var score = 1.0
- val members = mutableSetOf<String>()
-
- if (!parser.isExpectedStartObjectToken) {
- throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}")
- }
-
- while (parser.nextValue() != JsonToken.END_OBJECT) {
- when (parser.currentName) {
- "vms" -> parseGroupMembers(parser, members)
- "minServerLoad" -> targetLoad = parser.doubleValue
- "performanceScore" -> score = parser.doubleValue
- }
- }
-
- builder.addGroup(members, targetLoad, score)
- }
-
- /**
- * Parse the members of a group.
- */
- private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) {
- if (!parser.isExpectedStartArrayToken) {
- throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}")
- }
-
- while (parser.nextValue() != JsonToken.END_ARRAY) {
- if (parser.currentToken() != JsonToken.VALUE_STRING) {
- throw JsonParseException(parser, "Expected string value for group member")
- }
-
- val member = parser.text.removePrefix("vm__workload__").removeSuffix(".txt")
- members.add(member)
- }
- }
-
- private data class Group(
- @JsonProperty("minServerLoad")
- val targetLoad: Double,
- @JsonProperty("performanceScore")
- val score: Double,
- @JsonProperty("vms")
- val members: Set<String>,
- )
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
index bb9cb201..83b8c0c6 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
@@ -58,7 +58,7 @@ class CapelinBenchmarks {
fun setUp() {
val loader = ComputeWorkloadLoader(File("src/test/resources/trace"))
val source = trace("bitbrains-small")
- vms = source.resolve(loader, Random(1L))
+ vms = source.resolve(loader, Random(1L)).vms
topology = checkNotNull(object {}.javaClass.getResourceAsStream("/env/topology.txt")).use { clusterTopology(it) }
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 6604a190..0bbf1443 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -23,7 +23,6 @@
package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
-import mu.KotlinLogging
import org.opendc.compute.workload.ComputeServiceHelper
import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.createComputeScheduler
@@ -31,7 +30,6 @@ import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter
import org.opendc.compute.workload.grid5000
import org.opendc.compute.workload.telemetry.SdkTelemetryManager
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.util.VmInterferenceModelReader
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -52,11 +50,6 @@ import kotlin.math.roundToLong
*/
abstract class Portfolio(name: String) : Experiment(name) {
/**
- * The logger for this portfolio instance.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
* The configuration to use.
*/
private val config = ConfigFactory.load().getConfig("opendc.experiments.capelin")
@@ -97,18 +90,13 @@ abstract class Portfolio(name: String) : Experiment(name) {
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
- val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
- VmInterferenceModelReader()
- .read(File(config.getString("interference-model")))
- else
- null
-
val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements)
val failureModel =
if (operationalPhenomena.failureFrequency > 0)
grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()))
else
null
+ val (vms, interferenceModel) = workload.source.resolve(workloadLoader, seeder)
val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
@@ -116,7 +104,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
telemetry,
computeScheduler,
failureModel,
- performanceInterferenceModel?.withSeed(repeat.toLong())
+ interferenceModel?.withSeed(repeat.toLong())
)
val exporter = ParquetComputeMetricExporter(
@@ -132,8 +120,8 @@ abstract class Portfolio(name: String) : Experiment(name) {
// Instantiate the desired topology
runner.apply(topology)
- // Converge the workload trace
- runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong())
+ // Run the workload trace
+ runner.run(vms, seeder.nextLong())
} finally {
runner.close()
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 891fc8be..01b2a8fe 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -35,7 +35,6 @@ import org.opendc.compute.workload.*
import org.opendc.compute.workload.telemetry.SdkTelemetryManager
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.util.VmInterferenceModelReader
import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
@@ -85,7 +84,7 @@ class CapelinIntegrationTest {
*/
@Test
fun testLarge() = runBlockingSimulation {
- val workload = createTestWorkload(1.0)
+ val (workload, _) = createTestWorkload(1.0)
val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
@@ -135,7 +134,7 @@ class CapelinIntegrationTest {
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val workload = createTestWorkload(0.25, seed)
+ val (workload, _) = createTestWorkload(0.25, seed)
val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
@@ -180,12 +179,7 @@ class CapelinIntegrationTest {
@Test
fun testInterference() = runBlockingSimulation {
val seed = 0
- val workload = createTestWorkload(1.0, seed)
- val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
- val performanceInterferenceModel =
- VmInterferenceModelReader()
- .read(perfInterferenceInput)
- .withSeed(seed.toLong())
+ val (workload, interferenceModel) = createTestWorkload(1.0, seed)
val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
@@ -193,7 +187,7 @@ class CapelinIntegrationTest {
clock,
telemetry,
computeScheduler,
- interferenceModel = performanceInterferenceModel
+ interferenceModel = interferenceModel?.withSeed(seed.toLong())
)
val topology = createTopology("single")
@@ -240,7 +234,7 @@ class CapelinIntegrationTest {
grid5000(Duration.ofDays(7))
)
val topology = createTopology("single")
- val workload = createTestWorkload(0.25, seed)
+ val (workload, _) = createTestWorkload(0.25, seed)
telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
@@ -274,7 +268,7 @@ class CapelinIntegrationTest {
/**
* Obtain the trace reader for the test.
*/
- private fun createTestWorkload(fraction: Double, seed: Int = 0): List<VirtualMachine> {
+ private fun createTestWorkload(fraction: Double, seed: Int = 0): ComputeWorkload.Resolved {
val source = trace("bitbrains-small").sampleByLoad(fraction)
return source.resolve(workloadLoader, Random(seed.toLong()))
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/interference-model.json
index 51fc6366..51fc6366 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/interference-model.json
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
index 1c3e7149..532f6d24 100644
--- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,18 +20,26 @@
* SOFTWARE.
*/
-package org.opendc.compute.workload.util
+@file:JvmName("InterferenceGroupColumns")
+package org.opendc.trace.conv
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertDoesNotThrow
+import org.opendc.trace.TableColumn
+import org.opendc.trace.column
/**
- * Test suite for the [VmInterferenceModelReader] class.
+ * Members of the interference group.
*/
-class VmInterferenceModelReaderTest {
- @Test
- fun testSmoke() {
- val input = checkNotNull(VmInterferenceModelReader::class.java.getResourceAsStream("/perf-interference.json"))
- assertDoesNotThrow { VmInterferenceModelReader().read(input) }
- }
-}
+@JvmField
+public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("interference_group:members")
+
+/**
+ * Target load after which the interference occurs.
+ */
+@JvmField
+public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("interference_group:target")
+
+/**
+ * Performance score when the interference occurs.
+ */
+@JvmField
+public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("interference_group:score")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
index f1977945..e9fc5d44 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -21,8 +21,10 @@
*/
@file:JvmName("ResourceColumns")
-package org.opendc.trace
+package org.opendc.trace.conv
+import org.opendc.trace.TableColumn
+import org.opendc.trace.column
import java.time.Instant
/**
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
index 244352ae..d5bbafd7 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -21,8 +21,10 @@
*/
@file:JvmName("ResourceStateColumns")
-package org.opendc.trace
+package org.opendc.trace.conv
+import org.opendc.trace.TableColumn
+import org.opendc.trace.column
import java.time.Duration
import java.time.Instant
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt
index 31a58360..31a58360 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt
index bb9d93e2..669ebe58 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -21,7 +21,7 @@
*/
@file:JvmName("Tables")
-package org.opendc.trace
+package org.opendc.trace.conv
/**
* A table containing all workflows in a workload.
@@ -42,3 +42,8 @@ public const val TABLE_RESOURCES: String = "resources"
* A table containing all resource states in a workload.
*/
public const val TABLE_RESOURCE_STATES: String = "resource_states"
+
+/**
+ * A table containing the groups of resources that interfere when run on the same execution platform.
+ */
+public const val TABLE_INTERFERENCE_GROUPS: String = "interference_groups"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
index d103bce4..397c0794 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -21,8 +21,10 @@
*/
@file:JvmName("TaskColumns")
-package org.opendc.trace
+package org.opendc.trace.conv
+import org.opendc.trace.TableColumn
+import org.opendc.trace.column
import java.time.Duration
import java.time.Instant
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
index 94a91999..3132b1d9 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
@@ -26,6 +26,9 @@ import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
+import org.opendc.trace.conv.RESOURCE_ID
+import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT
+import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
import java.time.Instant
/**
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
index 6246dc35..154a37e4 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.time.Instant
/**
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
index c9982877..8e3e60cc 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
@@ -25,6 +25,7 @@ package org.opendc.trace.azure
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import com.fasterxml.jackson.dataformat.csv.CsvParser
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.CompositeTableReader
diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
index eda0b214..56f9a940 100644
--- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
@@ -25,7 +25,7 @@ package org.opendc.trace.azure
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
-import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.nio.file.Paths
/**
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
index c1b6f5ba..1e1d1a09 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
@@ -23,6 +23,7 @@
package org.opendc.trace.bitbrains
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.io.BufferedReader
import java.time.Instant
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
index 20222c8a..11d21a04 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
@@ -23,6 +23,7 @@
package org.opendc.trace.bitbrains
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.CompositeTableReader
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
index 3a8839b4..214fd749 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.text.NumberFormat
import java.time.Instant
import java.time.LocalDateTime
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
index 3701994a..55f09f43 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
@@ -24,6 +24,7 @@ package org.opendc.trace.bitbrains
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import org.opendc.trace.*
+import org.opendc.trace.conv.RESOURCE_ID
import java.nio.file.Path
/**
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
index 3885c931..e1e7604a 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
@@ -25,6 +25,7 @@ package org.opendc.trace.bitbrains
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import com.fasterxml.jackson.dataformat.csv.CsvParser
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.CompositeTableReader
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
index d734cf5f..77429e3e 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
@@ -25,7 +25,9 @@ package org.opendc.trace.bitbrains
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
-import org.opendc.trace.*
+import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
+import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
+import org.opendc.trace.conv.TABLE_RESOURCE_STATES
import java.nio.file.Paths
/**
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
index 41e7def2..9309beb1 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
@@ -25,7 +25,7 @@ package org.opendc.trace.bitbrains
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
-import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.nio.file.Paths
/**
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
index 7f01ef2b..42a9469e 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.time.Duration
import java.time.Instant
import java.util.regex.Pattern
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
index d4287420..63688523 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
@@ -25,6 +25,7 @@ package org.opendc.trace.gwf
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import com.fasterxml.jackson.dataformat.csv.CsvParser
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import java.nio.file.Path
diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
index 5dfd02a1..9bf28ad7 100644
--- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
@@ -24,7 +24,7 @@ package org.opendc.trace.gwf
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.*
-import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.nio.file.Paths
import java.time.Duration
import java.time.Instant
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
new file mode 100644
index 00000000..eb91e305
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
@@ -0,0 +1,169 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc
+
+import org.opendc.trace.*
+import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
+import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
+import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
+import shaded.parquet.com.fasterxml.jackson.core.JsonParseException
+import shaded.parquet.com.fasterxml.jackson.core.JsonParser
+import shaded.parquet.com.fasterxml.jackson.core.JsonToken
+
+/**
+ * A [TableReader] implementation for the OpenDC VM interference JSON format.
+ */
+internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) : TableReader {
+ /**
+ * A flag to indicate whether a single row has been read already.
+ */
+ private var isStarted = false
+
+ override fun nextRow(): Boolean {
+ if (!isStarted) {
+ isStarted = true
+
+ parser.nextToken()
+
+ if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}")
+ }
+ }
+
+ return if (parser.nextToken() != JsonToken.END_ARRAY) {
+ parseGroup(parser)
+ true
+ } else {
+ reset()
+ false
+ }
+ }
+
+ override fun resolve(column: TableColumn<*>): Int {
+ return when (column) {
+ INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS
+ INTERFERENCE_GROUP_TARGET -> COL_TARGET
+ INTERFERENCE_GROUP_SCORE -> COL_SCORE
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ return when (index) {
+ COL_MEMBERS, COL_TARGET, COL_SCORE -> false
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun get(index: Int): Any {
+ return when (index) {
+ COL_MEMBERS -> members
+ COL_TARGET -> targetLoad
+ COL_SCORE -> score
+ else -> throw IllegalArgumentException("Invalid column $index")
+ }
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getInt(index: Int): Int {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getDouble(index: Int): Double {
+ return when (index) {
+ COL_TARGET -> targetLoad
+ COL_SCORE -> score
+ else -> throw IllegalArgumentException("Invalid column $index")
+ }
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ private val COL_MEMBERS = 0
+ private val COL_TARGET = 1
+ private val COL_SCORE = 2
+
+ private var members = emptySet<String>()
+ private var targetLoad = Double.POSITIVE_INFINITY
+ private var score = 1.0
+
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ members = emptySet()
+ targetLoad = Double.POSITIVE_INFINITY
+ score = 1.0
+ }
+
+ /**
+ * Parse a group an interference JSON file.
+ */
+ private fun parseGroup(parser: JsonParser) {
+ var targetLoad = Double.POSITIVE_INFINITY
+ var score = 1.0
+ val members = mutableSetOf<String>()
+
+ if (!parser.isExpectedStartObjectToken) {
+ throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}")
+ }
+
+ while (parser.nextValue() != JsonToken.END_OBJECT) {
+ when (parser.currentName) {
+ "vms" -> parseGroupMembers(parser, members)
+ "minServerLoad" -> targetLoad = parser.doubleValue
+ "performanceScore" -> score = parser.doubleValue
+ }
+ }
+
+ this.members = members
+ this.targetLoad = targetLoad
+ this.score = score
+ }
+
+ /**
+ * Parse the members of a group.
+ */
+ private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) {
+ if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}")
+ }
+
+ while (parser.nextValue() != JsonToken.END_ARRAY) {
+ if (parser.currentToken() != JsonToken.VALUE_STRING) {
+ throw JsonParseException(parser, "Expected string value for group member")
+ }
+
+ members.add(parser.text)
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
new file mode 100644
index 00000000..64bc4356
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc
+
+import org.opendc.trace.*
+import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
+import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
+import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
+import shaded.parquet.com.fasterxml.jackson.core.JsonGenerator
+
+/**
+ * A [TableWriter] implementation for the OpenDC VM interference JSON format.
+ */
+internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGenerator) : TableWriter {
+ /**
+ * A flag to indicate whether a row has been started.
+ */
+ private var isRowActive = false
+
+ init {
+ generator.writeStartArray()
+ }
+
+ override fun startRow() {
+ // Reset state
+ members = emptySet()
+ targetLoad = Double.POSITIVE_INFINITY
+ score = 1.0
+
+ // Mark row as active
+ isRowActive = true
+ }
+
+ override fun endRow() {
+ check(isRowActive) { "No active row" }
+
+ generator.writeStartObject()
+ generator.writeArrayFieldStart("vms")
+ for (member in members) {
+ generator.writeString(member)
+ }
+ generator.writeEndArray()
+ generator.writeNumberField("minServerLoad", targetLoad)
+ generator.writeNumberField("performanceScore", score)
+ generator.writeEndObject()
+ }
+
+ override fun resolve(column: TableColumn<*>): Int {
+ return when (column) {
+ INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS
+ INTERFERENCE_GROUP_TARGET -> COL_TARGET
+ INTERFERENCE_GROUP_SCORE -> COL_SCORE
+ else -> -1
+ }
+ }
+
+ override fun set(index: Int, value: Any) {
+ check(isRowActive) { "No active row" }
+
+ @Suppress("UNCHECKED_CAST")
+ when (index) {
+ COL_MEMBERS -> members = value as Set<String>
+ COL_TARGET -> targetLoad = (value as Number).toDouble()
+ COL_SCORE -> score = (value as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun setBoolean(index: Int, value: Boolean) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setInt(index: Int, value: Int) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setLong(index: Int, value: Long) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setDouble(index: Int, value: Double) {
+ check(isRowActive) { "No active row" }
+
+ when (index) {
+ COL_TARGET -> targetLoad = (value as Number).toDouble()
+ COL_SCORE -> score = (value as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column $index")
+ }
+ }
+
+ override fun flush() {
+ generator.flush()
+ }
+
+ override fun close() {
+ generator.writeEndArray()
+ generator.close()
+ }
+
+ private val COL_MEMBERS = 0
+ private val COL_TARGET = 1
+ private val COL_SCORE = 2
+
+ private var members = emptySet<String>()
+ private var targetLoad = Double.POSITIVE_INFINITY
+ private var score = 1.0
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index b5043f82..b82da888 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -25,6 +25,7 @@ package org.opendc.trace.opendc
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import org.opendc.trace.util.parquet.LocalParquetReader
import java.time.Duration
import java.time.Instant
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
index 15a8cb85..01b9750c 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
@@ -27,6 +27,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.time.Duration
import java.time.Instant
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index ffbdc440..4909e70e 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -25,6 +25,7 @@ package org.opendc.trace.opendc
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import org.opendc.trace.util.parquet.LocalParquetReader
import java.time.Instant
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
index 4b66a86f..edc89ee6 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
@@ -27,6 +27,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.time.Instant
import kotlin.math.roundToLong
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index 886f3d54..36a1b4a0 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -29,19 +29,28 @@ import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.parquet.LocalOutputFile
import org.opendc.trace.util.parquet.LocalParquetReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding
+import shaded.parquet.com.fasterxml.jackson.core.JsonFactory
import java.nio.file.Files
import java.nio.file.Path
+import kotlin.io.path.exists
/**
* A [TraceFormat] implementation of the OpenDC virtual machine trace format.
*/
public class OdcVmTraceFormat : TraceFormat {
/**
+ * A [JsonFactory] that is used to parse the JSON-based interference model.
+ */
+ private val jsonFactory = JsonFactory()
+
+ /**
* The name of this trace format.
*/
override val name: String = "opendc-vm"
@@ -58,7 +67,7 @@ public class OdcVmTraceFormat : TraceFormat {
}
}
- override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
+ override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS)
override fun getDetails(path: Path, table: String): TableDetails {
return when (table) {
@@ -82,6 +91,13 @@ public class OdcVmTraceFormat : TraceFormat {
),
listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP)
)
+ TABLE_INTERFERENCE_GROUPS -> TableDetails(
+ listOf(
+ INTERFERENCE_GROUP_MEMBERS,
+ INTERFERENCE_GROUP_TARGET,
+ INTERFERENCE_GROUP_SCORE,
+ )
+ )
else -> throw IllegalArgumentException("Table $table not supported")
}
}
@@ -96,6 +112,15 @@ public class OdcVmTraceFormat : TraceFormat {
val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
OdcVmResourceStateTableReader(reader)
}
+ TABLE_INTERFERENCE_GROUPS -> {
+ val modelPath = path.resolve("interference-model.json")
+ val parser = if (modelPath.exists())
+ jsonFactory.createParser(modelPath.toFile())
+ else
+ jsonFactory.createParser("[]") // If model does not exist, return empty model
+
+ OdcVmInterferenceJsonTableReader(parser)
+ }
else -> throw IllegalArgumentException("Table $table not supported")
}
}
@@ -122,6 +147,10 @@ public class OdcVmTraceFormat : TraceFormat {
.build()
OdcVmResourceStateTableWriter(writer, schema)
}
+ TABLE_INTERFERENCE_GROUPS -> {
+ val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8)
+ OdcVmInterferenceJsonTableWriter(generator)
+ }
else -> throw IllegalArgumentException("Table $table not supported")
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index bfe0f881..c8742624 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
-import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.nio.file.Paths
/**
@@ -41,7 +41,7 @@ internal class OdcVmTraceFormatTest {
fun testTables() {
val path = Paths.get("src/test/resources/trace-v2.1")
- assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path))
+ assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS), format.getTables(path))
}
@Test
@@ -93,4 +93,33 @@ internal class OdcVmTraceFormatTest {
reader.close()
}
+
+ @Test
+ fun testInterferenceGroups() {
+ val path = Paths.get("src/test/resources/trace-v2.1")
+ val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS)
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals(setOf("1019", "1023", "1052"), reader.get(INTERFERENCE_GROUP_MEMBERS)) },
+ { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) },
+ { assertEquals(0.8830158730158756, reader.get(INTERFERENCE_GROUP_SCORE)) },
+ { assertTrue(reader.nextRow()) },
+ { assertEquals(setOf("1023", "1052", "1073"), reader.get(INTERFERENCE_GROUP_MEMBERS)) },
+ { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) },
+ { assertEquals(0.7133055555552751, reader.get(INTERFERENCE_GROUP_SCORE)) },
+ { assertFalse(reader.nextRow()) }
+ )
+
+ reader.close()
+ }
+
+ @Test
+ fun testInterferenceGroupsEmpty() {
+ val path = Paths.get("src/test/resources/trace-v2.0")
+ val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS)
+
+ assertFalse(reader.nextRow())
+ reader.close()
+ }
}
diff --git a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/interference-model.json
index 1be5852b..6a0616d9 100644
--- a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/interference-model.json
@@ -1,20 +1,18 @@
[
{
"vms": [
- "vm_a",
- "vm_c",
- "vm_x",
- "vm_y"
+ "1019",
+ "1023",
+ "1052"
],
"minServerLoad": 0.0,
"performanceScore": 0.8830158730158756
},
{
"vms": [
- "vm_a",
- "vm_b",
- "vm_c",
- "vm_d"
+ "1023",
+ "1052",
+ "1073"
],
"minServerLoad": 0.0,
"performanceScore": 0.7133055555552751
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
index 2f6ea6ee..40b604c3 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
@@ -23,6 +23,7 @@
package org.opendc.trace.swf
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.io.BufferedReader
import java.time.Duration
import java.time.Instant
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
index 1fd076d5..b969f3ef 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
@@ -23,6 +23,7 @@
package org.opendc.trace.swf
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import java.nio.file.Path
diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
index 4dcd43f6..1698f644 100644
--- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
@@ -24,9 +24,9 @@ package org.opendc.trace.swf
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.*
-import org.opendc.trace.TABLE_TASKS
-import org.opendc.trace.TASK_ALLOC_NCPUS
-import org.opendc.trace.TASK_ID
+import org.opendc.trace.conv.TABLE_TASKS
+import org.opendc.trace.conv.TASK_ALLOC_NCPUS
+import org.opendc.trace.conv.TASK_ID
import java.nio.file.Paths
/**
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
index 5088b044..c71035d4 100644
--- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
@@ -33,6 +33,7 @@ import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.*
import mu.KotlinLogging
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.io.File
import java.time.Duration
import java.time.Instant
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
index 7f378d80..d8eafa9c 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.core.JsonToken
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.time.Duration
import kotlin.math.roundToInt
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
index c75e3cbb..bc175b58 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
@@ -24,6 +24,7 @@ package org.opendc.trace.wfformat
import com.fasterxml.jackson.core.JsonFactory
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import java.nio.file.Path
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
index b07f27ed..e27bc82c 100644
--- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
@@ -28,8 +28,8 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
-import org.opendc.trace.TASK_ID
-import org.opendc.trace.TASK_PARENTS
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.conv.TASK_PARENTS
/**
* Test suite for the [WfFormatTaskTableReader] class.
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
index 217b175d..710de88e 100644
--- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
@@ -27,7 +27,7 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
-import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.nio.file.Paths
/**
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index 45ec25dd..1e332aca 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -25,6 +25,7 @@ package org.opendc.trace.wtf
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import org.opendc.trace.util.parquet.LocalParquetReader
import java.time.Duration
import java.time.Instant
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index ef88d295..c8f9ecaa 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -24,6 +24,7 @@ package org.opendc.trace.wtf
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.parquet.LocalParquetReader
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
index 09c3703a..0f0e422d 100644
--- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -25,7 +25,7 @@ package org.opendc.trace.wtf
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
-import org.opendc.trace.*
+import org.opendc.trace.conv.*
import java.nio.file.Paths
import java.time.Duration
import java.time.Instant
@@ -69,7 +69,14 @@ class WtfTraceFormatTest {
{ assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) },
{ assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) },
{ assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) },
- { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) },
+ {
+ assertEquals(
+ setOf("584055316413447529", "133113685133695608", "1008582348422865408"),
+ reader.get(
+ TASK_PARENTS
+ )
+ )
+ },
)
assertAll(
@@ -78,7 +85,14 @@ class WtfTraceFormatTest {
{ assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) },
{ assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) },
{ assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) },
- { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) },
+ {
+ assertEquals(
+ setOf("584055316413447529", "133113685133695608", "1008582348422865408"),
+ reader.get(
+ TASK_PARENTS
+ )
+ )
+ },
)
reader.close()
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index bd770574..a150de4e 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -28,8 +28,6 @@ import org.opendc.compute.workload.telemetry.SdkTelemetryManager
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.util.VmInterferenceModelReader
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -145,21 +143,8 @@ public class OpenDCRunner(
val heartbeat = scheduler.scheduleWithFixedDelay({ manager.heartbeat(id) }, 0, heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS)
try {
- logger.debug { "Constructing performance interference model" }
-
- val interferenceModel = let {
- val path = tracePath.resolve(scenario.workload.trace.id).resolve("performance-interference-model.json")
- val enabled = scenario.phenomena.interference
-
- if (!enabled || !path.exists()) {
- return@let null
- }
-
- VmInterferenceModelReader().read(path.inputStream())
- }
-
val topology = convertTopology(scenario.topology)
- val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology, interferenceModel) }
+ val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology) }
val results = invokeAll(jobs)
logger.info { "Finished simulation for job $id" }
@@ -190,13 +175,11 @@ public class OpenDCRunner(
* @param scenario The scenario to simulate.
* @param repeat The repeat number used to seed the simulation.
* @param topology The topology to simulate.
- * @param interferenceModel The [VmInterferenceModel] used in this scenario.
*/
private inner class SimulationTask(
private val scenario: Scenario,
private val repeat: Int,
private val topology: Topology,
- private val interferenceModel: VmInterferenceModel?
) : RecursiveTask<WebComputeMetricExporter.Results>() {
override fun compute(): WebComputeMetricExporter.Results {
val exporter = WebComputeMetricExporter()
@@ -215,6 +198,7 @@ public class OpenDCRunner(
val phenomena = scenario.phenomena
val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder)
val workload = trace(workloadName).sampleByLoad(workloadFraction)
+ val (vms, interferenceModel) = workload.resolve(workloadLoader, seeder)
val failureModel =
if (phenomena.failures)
@@ -229,7 +213,7 @@ public class OpenDCRunner(
telemetry,
computeScheduler,
failureModel,
- interferenceModel
+ interferenceModel.takeIf { phenomena.interference }
)
telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1)))
@@ -238,7 +222,7 @@ public class OpenDCRunner(
// Instantiate the topology onto the simulator
simulator.apply(topology)
// Run workload trace
- simulator.run(workload.resolve(workloadLoader, seeder), seeder.nextLong())
+ simulator.run(vms, seeder.nextLong())
val serviceMetrics = collectServiceMetrics(telemetry.metricProducer)
logger.debug {
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt
index 73995d08..3aa4463c 100644
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt
+++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt
@@ -25,6 +25,7 @@ package org.opendc.workflow.workload
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.trace.*
+import org.opendc.trace.conv.*
import org.opendc.workflow.api.Job
import org.opendc.workflow.api.Task
import org.opendc.workflow.api.WORKFLOW_TASK_CORES