summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-workload/build.gradle.kts3
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt59
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt12
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt3
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt128
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt37
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json22
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt20
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt18
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/interference-model.json (renamed from opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json)0
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt24
15 files changed, 86 insertions, 262 deletions
diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts
index 93e09b99..41a4b52d 100644
--- a/opendc-compute/opendc-compute-workload/build.gradle.kts
+++ b/opendc-compute/opendc-compute-workload/build.gradle.kts
@@ -40,7 +40,4 @@ dependencies {
implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
- implementation(libs.jackson.databind)
- implementation(libs.jackson.module.kotlin)
- implementation(kotlin("reflect"))
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
index 78002c2f..aa0b5eaf 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
@@ -22,6 +22,7 @@
package org.opendc.compute.workload
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import java.util.*
/**
@@ -31,5 +32,10 @@ public interface ComputeWorkload {
/**
* Resolve the workload into a list of [VirtualMachine]s to simulate.
*/
- public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine>
+ public fun resolve(loader: ComputeWorkloadLoader, random: Random): Resolved
+
+ /**
+ * A concrete instance of a workload.
+ */
+ public data class Resolved(val vms: List<VirtualMachine>, val interferenceModel: VmInterferenceModel?)
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index 36a76f68..e4d86787 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -23,9 +23,11 @@
package org.opendc.compute.workload
import mu.KotlinLogging
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimTrace
import org.opendc.trace.*
import java.io.File
+import java.lang.ref.SoftReference
import java.time.Duration
import java.time.Instant
import java.util.*
@@ -47,7 +49,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
/**
* The cache of workloads.
*/
- private val cache = ConcurrentHashMap<String, List<VirtualMachine>>()
+ private val cache = ConcurrentHashMap<String, SoftReference<ComputeWorkload.Resolved>>()
/**
* Read the fragments into memory.
@@ -145,18 +147,59 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
}
/**
+ * Read the interference model associated with the specified [trace].
+ */
+ private fun parseInterferenceModel(trace: Trace): VmInterferenceModel {
+ val reader = checkNotNull(trace.getTable(TABLE_INTERFERENCE_GROUPS)).newReader()
+
+ return try {
+ val membersCol = reader.resolve(INTERFERENCE_GROUP_MEMBERS)
+ val targetCol = reader.resolve(INTERFERENCE_GROUP_TARGET)
+ val scoreCol = reader.resolve(INTERFERENCE_GROUP_SCORE)
+
+ val modelBuilder = VmInterferenceModel.builder()
+
+ while (reader.nextRow()) {
+ @Suppress("UNCHECKED_CAST")
+ val members = reader.get(membersCol) as Set<String>
+ val target = reader.getDouble(targetCol)
+ val score = reader.getDouble(scoreCol)
+
+ modelBuilder
+ .addGroup(members, target, score)
+ }
+
+ modelBuilder.build()
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
* Load the trace with the specified [name] and [format].
*/
- public fun get(name: String, format: String): List<VirtualMachine> {
- return cache.computeIfAbsent(name) {
- val path = baseDir.resolve(it)
+ public fun get(name: String, format: String): ComputeWorkload.Resolved {
+ val ref = cache.compute(name) { key, oldVal ->
+ val inst = oldVal?.get()
+ if (inst == null) {
- logger.info { "Loading trace $it at $path" }
+ val path = baseDir.resolve(key)
- val trace = Trace.open(path, format)
- val fragments = parseFragments(trace)
- parseMeta(trace, fragments)
+ logger.info { "Loading trace $key at $path" }
+
+ val trace = Trace.open(path, format)
+ val fragments = parseFragments(trace)
+ val vms = parseMeta(trace, fragments)
+ val interferenceModel = parseInterferenceModel(trace)
+ val instance = ComputeWorkload.Resolved(vms, interferenceModel)
+
+ SoftReference(instance)
+ } else {
+ oldVal
+ }
}
+
+ return checkNotNull(ref?.get()) { "Memory pressure" }
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
index 9b2bec55..1959c48d 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
@@ -37,17 +37,17 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double
*/
private val logger = KotlinLogging.logger {}
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) }
- val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } }
+ val totalLoad = traces.sumOf { (_, w) -> w.vms.sumOf { it.totalLoad } }
val res = mutableListOf<VirtualMachine>()
- for ((fraction, vms) in traces) {
+ for ((fraction, w) in traces) {
var currentLoad = 0.0
- for (entry in vms) {
+ for (entry in w.vms) {
val entryLoad = entry.totalLoad
if ((currentLoad + entryLoad) / totalLoad > fraction) {
break
@@ -58,9 +58,9 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double
}
}
- val vmCount = traces.sumOf { (_, vms) -> vms.size }
+ val vmCount = traces.sumOf { (_, w) -> w.vms.size }
logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" }
- return res
+ return ComputeWorkload.Resolved(res, null)
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
index 52f4c672..84a77f0f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
@@ -45,8 +45,8 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti
*/
private val pattern = Regex("^(ComputeNode|cn).*")
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
- val vms = source.resolve(loader, random)
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
+ val (vms, interferenceModel) = source.resolve(loader, random)
val (hpc, nonHpc) = vms.partition { entry ->
val name = entry.name
@@ -130,7 +130,7 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti
logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
- return res
+ return ComputeWorkload.Resolved(res, interferenceModel)
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
index ef6de729..bc13560c 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
@@ -37,8 +37,8 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract
*/
private val logger = KotlinLogging.logger {}
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
- val vms = source.resolve(loader, random)
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
+ val (vms, interferenceModel) = source.resolve(loader, random)
val res = mutableListOf<VirtualMachine>()
val totalLoad = vms.sumOf { it.totalLoad }
@@ -56,6 +56,6 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract
logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
- return res
+ return ComputeWorkload.Resolved(res, interferenceModel)
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
index c20cb8f3..dc9abaef 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
@@ -24,14 +24,13 @@ package org.opendc.compute.workload.internal
import org.opendc.compute.workload.ComputeWorkload
import org.opendc.compute.workload.ComputeWorkloadLoader
-import org.opendc.compute.workload.VirtualMachine
import java.util.*
/**
* A [ComputeWorkload] from a trace.
*/
internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload {
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
return loader.get(name, format)
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt
deleted file mode 100644
index e0fa8904..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.util
-
-import com.fasterxml.jackson.annotation.JsonProperty
-import com.fasterxml.jackson.core.JsonParseException
-import com.fasterxml.jackson.core.JsonParser
-import com.fasterxml.jackson.core.JsonToken
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import java.io.File
-import java.io.InputStream
-
-/**
- * A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
- */
-public class VmInterferenceModelReader {
- /**
- * The [ObjectMapper] to use.
- */
- private val mapper = jacksonObjectMapper()
-
- /**
- * Read the performance interface model from [file].
- */
- public fun read(file: File): VmInterferenceModel {
- val builder = VmInterferenceModel.builder()
- val parser = mapper.createParser(file)
- parseGroups(parser, builder)
- return builder.build()
- }
-
- /**
- * Read the performance interface model from the input.
- */
- public fun read(input: InputStream): VmInterferenceModel {
- val builder = VmInterferenceModel.builder()
- val parser = mapper.createParser(input)
- parseGroups(parser, builder)
- return builder.build()
- }
-
- /**
- * Parse all groups in an interference JSON file.
- */
- private fun parseGroups(parser: JsonParser, builder: VmInterferenceModel.Builder) {
- parser.nextToken()
-
- if (!parser.isExpectedStartArrayToken) {
- throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}")
- }
-
- while (parser.nextToken() != JsonToken.END_ARRAY) {
- parseGroup(parser, builder)
- }
- }
-
- /**
- * Parse a group an interference JSON file.
- */
- private fun parseGroup(parser: JsonParser, builder: VmInterferenceModel.Builder) {
- var targetLoad = Double.POSITIVE_INFINITY
- var score = 1.0
- val members = mutableSetOf<String>()
-
- if (!parser.isExpectedStartObjectToken) {
- throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}")
- }
-
- while (parser.nextValue() != JsonToken.END_OBJECT) {
- when (parser.currentName) {
- "vms" -> parseGroupMembers(parser, members)
- "minServerLoad" -> targetLoad = parser.doubleValue
- "performanceScore" -> score = parser.doubleValue
- }
- }
-
- builder.addGroup(members, targetLoad, score)
- }
-
- /**
- * Parse the members of a group.
- */
- private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) {
- if (!parser.isExpectedStartArrayToken) {
- throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}")
- }
-
- while (parser.nextValue() != JsonToken.END_ARRAY) {
- if (parser.currentToken() != JsonToken.VALUE_STRING) {
- throw JsonParseException(parser, "Expected string value for group member")
- }
-
- val member = parser.text.removePrefix("vm__workload__").removeSuffix(".txt")
- members.add(member)
- }
- }
-
- private data class Group(
- @JsonProperty("minServerLoad")
- val targetLoad: Double,
- @JsonProperty("performanceScore")
- val score: Double,
- @JsonProperty("vms")
- val members: Set<String>,
- )
-}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt
deleted file mode 100644
index 1c3e7149..00000000
--- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.util
-
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertDoesNotThrow
-
-/**
- * Test suite for the [VmInterferenceModelReader] class.
- */
-class VmInterferenceModelReaderTest {
- @Test
- fun testSmoke() {
- val input = checkNotNull(VmInterferenceModelReader::class.java.getResourceAsStream("/perf-interference.json"))
- assertDoesNotThrow { VmInterferenceModelReader().read(input) }
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json
deleted file mode 100644
index 1be5852b..00000000
--- a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json
+++ /dev/null
@@ -1,22 +0,0 @@
-[
- {
- "vms": [
- "vm_a",
- "vm_c",
- "vm_x",
- "vm_y"
- ],
- "minServerLoad": 0.0,
- "performanceScore": 0.8830158730158756
- },
- {
- "vms": [
- "vm_a",
- "vm_b",
- "vm_c",
- "vm_d"
- ],
- "minServerLoad": 0.0,
- "performanceScore": 0.7133055555552751
- }
-]
diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
index bb9cb201..83b8c0c6 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
@@ -58,7 +58,7 @@ class CapelinBenchmarks {
fun setUp() {
val loader = ComputeWorkloadLoader(File("src/test/resources/trace"))
val source = trace("bitbrains-small")
- vms = source.resolve(loader, Random(1L))
+ vms = source.resolve(loader, Random(1L)).vms
topology = checkNotNull(object {}.javaClass.getResourceAsStream("/env/topology.txt")).use { clusterTopology(it) }
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 6604a190..0bbf1443 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -23,7 +23,6 @@
package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
-import mu.KotlinLogging
import org.opendc.compute.workload.ComputeServiceHelper
import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.createComputeScheduler
@@ -31,7 +30,6 @@ import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter
import org.opendc.compute.workload.grid5000
import org.opendc.compute.workload.telemetry.SdkTelemetryManager
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.util.VmInterferenceModelReader
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -52,11 +50,6 @@ import kotlin.math.roundToLong
*/
abstract class Portfolio(name: String) : Experiment(name) {
/**
- * The logger for this portfolio instance.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
* The configuration to use.
*/
private val config = ConfigFactory.load().getConfig("opendc.experiments.capelin")
@@ -97,18 +90,13 @@ abstract class Portfolio(name: String) : Experiment(name) {
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
- val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
- VmInterferenceModelReader()
- .read(File(config.getString("interference-model")))
- else
- null
-
val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements)
val failureModel =
if (operationalPhenomena.failureFrequency > 0)
grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()))
else
null
+ val (vms, interferenceModel) = workload.source.resolve(workloadLoader, seeder)
val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
@@ -116,7 +104,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
telemetry,
computeScheduler,
failureModel,
- performanceInterferenceModel?.withSeed(repeat.toLong())
+ interferenceModel?.withSeed(repeat.toLong())
)
val exporter = ParquetComputeMetricExporter(
@@ -132,8 +120,8 @@ abstract class Portfolio(name: String) : Experiment(name) {
// Instantiate the desired topology
runner.apply(topology)
- // Converge the workload trace
- runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong())
+ // Run the workload trace
+ runner.run(vms, seeder.nextLong())
} finally {
runner.close()
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 891fc8be..01b2a8fe 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -35,7 +35,6 @@ import org.opendc.compute.workload.*
import org.opendc.compute.workload.telemetry.SdkTelemetryManager
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.util.VmInterferenceModelReader
import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
@@ -85,7 +84,7 @@ class CapelinIntegrationTest {
*/
@Test
fun testLarge() = runBlockingSimulation {
- val workload = createTestWorkload(1.0)
+ val (workload, _) = createTestWorkload(1.0)
val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
@@ -135,7 +134,7 @@ class CapelinIntegrationTest {
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val workload = createTestWorkload(0.25, seed)
+ val (workload, _) = createTestWorkload(0.25, seed)
val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
@@ -180,12 +179,7 @@ class CapelinIntegrationTest {
@Test
fun testInterference() = runBlockingSimulation {
val seed = 0
- val workload = createTestWorkload(1.0, seed)
- val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
- val performanceInterferenceModel =
- VmInterferenceModelReader()
- .read(perfInterferenceInput)
- .withSeed(seed.toLong())
+ val (workload, interferenceModel) = createTestWorkload(1.0, seed)
val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
@@ -193,7 +187,7 @@ class CapelinIntegrationTest {
clock,
telemetry,
computeScheduler,
- interferenceModel = performanceInterferenceModel
+ interferenceModel = interferenceModel?.withSeed(seed.toLong())
)
val topology = createTopology("single")
@@ -240,7 +234,7 @@ class CapelinIntegrationTest {
grid5000(Duration.ofDays(7))
)
val topology = createTopology("single")
- val workload = createTestWorkload(0.25, seed)
+ val (workload, _) = createTestWorkload(0.25, seed)
telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
@@ -274,7 +268,7 @@ class CapelinIntegrationTest {
/**
* Obtain the trace reader for the test.
*/
- private fun createTestWorkload(fraction: Double, seed: Int = 0): List<VirtualMachine> {
+ private fun createTestWorkload(fraction: Double, seed: Int = 0): ComputeWorkload.Resolved {
val source = trace("bitbrains-small").sampleByLoad(fraction)
return source.resolve(workloadLoader, Random(seed.toLong()))
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/interference-model.json
index 51fc6366..51fc6366 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/interference-model.json
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index bd770574..a150de4e 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -28,8 +28,6 @@ import org.opendc.compute.workload.telemetry.SdkTelemetryManager
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.util.VmInterferenceModelReader
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -145,21 +143,8 @@ public class OpenDCRunner(
val heartbeat = scheduler.scheduleWithFixedDelay({ manager.heartbeat(id) }, 0, heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS)
try {
- logger.debug { "Constructing performance interference model" }
-
- val interferenceModel = let {
- val path = tracePath.resolve(scenario.workload.trace.id).resolve("performance-interference-model.json")
- val enabled = scenario.phenomena.interference
-
- if (!enabled || !path.exists()) {
- return@let null
- }
-
- VmInterferenceModelReader().read(path.inputStream())
- }
-
val topology = convertTopology(scenario.topology)
- val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology, interferenceModel) }
+ val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology) }
val results = invokeAll(jobs)
logger.info { "Finished simulation for job $id" }
@@ -190,13 +175,11 @@ public class OpenDCRunner(
* @param scenario The scenario to simulate.
* @param repeat The repeat number used to seed the simulation.
* @param topology The topology to simulate.
- * @param interferenceModel The [VmInterferenceModel] used in this scenario.
*/
private inner class SimulationTask(
private val scenario: Scenario,
private val repeat: Int,
private val topology: Topology,
- private val interferenceModel: VmInterferenceModel?
) : RecursiveTask<WebComputeMetricExporter.Results>() {
override fun compute(): WebComputeMetricExporter.Results {
val exporter = WebComputeMetricExporter()
@@ -215,6 +198,7 @@ public class OpenDCRunner(
val phenomena = scenario.phenomena
val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder)
val workload = trace(workloadName).sampleByLoad(workloadFraction)
+ val (vms, interferenceModel) = workload.resolve(workloadLoader, seeder)
val failureModel =
if (phenomena.failures)
@@ -229,7 +213,7 @@ public class OpenDCRunner(
telemetry,
computeScheduler,
failureModel,
- interferenceModel
+ interferenceModel.takeIf { phenomena.interference }
)
telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1)))
@@ -238,7 +222,7 @@ public class OpenDCRunner(
// Instantiate the topology onto the simulator
simulator.apply(topology)
// Run workload trace
- simulator.run(workload.resolve(workloadLoader, seeder), seeder.nextLong())
+ simulator.run(vms, seeder.nextLong())
val serviceMetrics = collectServiceMetrics(telemetry.metricProducer)
logger.debug {