summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt68
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt128
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt45
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt (renamed from opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt)31
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt8
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt9
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt82
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt30
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt22
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt385
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt12
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt9
12 files changed, 552 insertions, 277 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt
deleted file mode 100644
index 67f9626c..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.util
-
-import com.fasterxml.jackson.annotation.JsonProperty
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import com.fasterxml.jackson.module.kotlin.readValue
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup
-import java.io.File
-import java.io.InputStream
-
-/**
- * A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
- */
-public class PerformanceInterferenceReader {
- /**
- * The [ObjectMapper] to use.
- */
- private val mapper = jacksonObjectMapper()
-
- init {
- mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java)
- }
-
- /**
- * Read the performance interface model from [file].
- */
- public fun read(file: File): List<VmInterferenceGroup> {
- return mapper.readValue(file)
- }
-
- /**
- * Read the performance interface model from the input.
- */
- public fun read(input: InputStream): List<VmInterferenceGroup> {
- return mapper.readValue(input)
- }
-
- private data class GroupMixin(
- @JsonProperty("minServerLoad")
- val targetLoad: Double,
- @JsonProperty("performanceScore")
- val score: Double,
- @JsonProperty("vms")
- val members: Set<String>,
- )
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt
new file mode 100644
index 00000000..e0fa8904
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.util
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.core.JsonParseException
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import java.io.File
+import java.io.InputStream
+
+/**
+ * A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
+ */
+public class VmInterferenceModelReader {
+ /**
+ * The [ObjectMapper] to use.
+ */
+ private val mapper = jacksonObjectMapper()
+
+ /**
+ * Read the performance interface model from [file].
+ */
+ public fun read(file: File): VmInterferenceModel {
+ val builder = VmInterferenceModel.builder()
+ val parser = mapper.createParser(file)
+ parseGroups(parser, builder)
+ return builder.build()
+ }
+
+ /**
+ * Read the performance interface model from the input.
+ */
+ public fun read(input: InputStream): VmInterferenceModel {
+ val builder = VmInterferenceModel.builder()
+ val parser = mapper.createParser(input)
+ parseGroups(parser, builder)
+ return builder.build()
+ }
+
+ /**
+ * Parse all groups in an interference JSON file.
+ */
+ private fun parseGroups(parser: JsonParser, builder: VmInterferenceModel.Builder) {
+ parser.nextToken()
+
+ if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}")
+ }
+
+ while (parser.nextToken() != JsonToken.END_ARRAY) {
+ parseGroup(parser, builder)
+ }
+ }
+
+ /**
+ * Parse a group an interference JSON file.
+ */
+ private fun parseGroup(parser: JsonParser, builder: VmInterferenceModel.Builder) {
+ var targetLoad = Double.POSITIVE_INFINITY
+ var score = 1.0
+ val members = mutableSetOf<String>()
+
+ if (!parser.isExpectedStartObjectToken) {
+ throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}")
+ }
+
+ while (parser.nextValue() != JsonToken.END_OBJECT) {
+ when (parser.currentName) {
+ "vms" -> parseGroupMembers(parser, members)
+ "minServerLoad" -> targetLoad = parser.doubleValue
+ "performanceScore" -> score = parser.doubleValue
+ }
+ }
+
+ builder.addGroup(members, targetLoad, score)
+ }
+
+ /**
+ * Parse the members of a group.
+ */
+ private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) {
+ if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}")
+ }
+
+ while (parser.nextValue() != JsonToken.END_ARRAY) {
+ if (parser.currentToken() != JsonToken.VALUE_STRING) {
+ throw JsonParseException(parser, "Expected string value for group member")
+ }
+
+ val member = parser.text.removePrefix("vm__workload__").removeSuffix(".txt")
+ members.add(member)
+ }
+ }
+
+ private data class Group(
+ @JsonProperty("minServerLoad")
+ val targetLoad: Double,
+ @JsonProperty("performanceScore")
+ val score: Double,
+ @JsonProperty("vms")
+ val members: Set<String>,
+ )
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt
deleted file mode 100644
index c79f0584..00000000
--- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.util
-
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
-
-/**
- * Test suite for the [PerformanceInterferenceReader] class.
- */
-class PerformanceInterferenceReaderTest {
- @Test
- fun testSmoke() {
- val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json"))
- val result = PerformanceInterferenceReader().read(input)
-
- assertAll(
- { assertEquals(2, result.size) },
- { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) },
- { assertEquals(0.0, result[0].targetLoad, 0.001) },
- { assertEquals(0.8830158730158756, result[0].score, 0.001) }
- )
- }
-}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt
index 708ddede..1c3e7149 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt
@@ -20,25 +20,18 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.kernel.interference
+package org.opendc.compute.workload.util
+
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
/**
- * A group of virtual machines that together can interfere when operating on the same resources, causing performance
- * variability.
+ * Test suite for the [VmInterferenceModelReader] class.
*/
-public data class VmInterferenceGroup(
- /**
- * The minimum load of the host before the interference occurs.
- */
- public val targetLoad: Double,
-
- /**
- * A score in [0, 1] representing the performance variability as a result of resource interference.
- */
- public val score: Double,
-
- /**
- * The members of this interference group.
- */
- public val members: Set<String>
-)
+class VmInterferenceModelReaderTest {
+ @Test
+ fun testSmoke() {
+ val input = checkNotNull(VmInterferenceModelReader::class.java.getResourceAsStream("/perf-interference.json"))
+ assertDoesNotThrow { VmInterferenceModelReader().read(input) }
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 4e855f82..53c9de11 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -30,14 +30,13 @@ import org.opendc.compute.workload.createComputeScheduler
import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter
import org.opendc.compute.workload.grid5000
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.util.PerformanceInterferenceReader
+import org.opendc.compute.workload.util.VmInterferenceModelReader
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
@@ -99,9 +98,8 @@ abstract class Portfolio(name: String) : Experiment(name) {
val seeder = Random(repeat.toLong())
val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
- PerformanceInterferenceReader()
+ VmInterferenceModelReader()
.read(File(config.getString("interference-model")))
- .let { VmInterferenceModel(it, Random(seeder.nextLong())) }
else
null
@@ -116,7 +114,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
clock,
computeScheduler,
failureModel,
- performanceInterferenceModel
+ performanceInterferenceModel?.withSeed(repeat.toLong())
)
val exporter = ParquetComputeMetricExporter(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 94e92c1b..56ba9cfe 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -34,9 +34,8 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.workload.*
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.util.PerformanceInterferenceReader
+import org.opendc.compute.workload.util.VmInterferenceModelReader
import org.opendc.experiments.capelin.topology.clusterTopology
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
@@ -177,9 +176,9 @@ class CapelinIntegrationTest {
val workload = createTestWorkload(1.0, seed)
val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
val performanceInterferenceModel =
- PerformanceInterferenceReader()
+ VmInterferenceModelReader()
.read(perfInterferenceInput)
- .let { VmInterferenceModel(it, Random(seed.toLong())) }
+ .withSeed(seed.toLong())
val simulator = ComputeWorkloadRunner(
coroutineContext,
@@ -213,7 +212,7 @@ class CapelinIntegrationTest {
{ assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
{ assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(481251, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
+ { assertEquals(465088, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 60a10f20..5909d980 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -77,33 +77,41 @@ public abstract class SimAbstractMachine(
private var isTerminated = false
/**
- * The continuation to resume when the virtual machine workload has finished.
+ * The current active [Context].
*/
- private var cont: Continuation<Unit>? = null
+ private var _ctx: Context? = null
+
+ /**
+ * This method is invoked when the machine is started.
+ */
+ protected open fun onStart(ctx: SimMachineContext) {}
+
+ /**
+ * This method is invoked when the machine is stopped.
+ */
+ protected open fun onStop(ctx: SimMachineContext) {
+ _ctx = null
+ }
/**
* Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
check(!isTerminated) { "Machine is terminated" }
- check(cont == null) { "A machine cannot run concurrently" }
-
- val ctx = Context(meta)
+ check(_ctx == null) { "A machine cannot run concurrently" }
return suspendCancellableCoroutine { cont ->
- this.cont = cont
+ val ctx = Context(meta, cont)
+ _ctx = ctx
// Cancel all cpus on cancellation
- cont.invokeOnCancellation {
- this.cont = null
- engine.batch {
- for (cpu in cpus) {
- cpu.cancel()
- }
- }
- }
+ cont.invokeOnCancellation { ctx.close() }
- engine.batch { workload.onStart(ctx) }
+ engine.batch {
+ onStart(ctx)
+
+ workload.onStart(ctx)
+ }
}
}
@@ -113,7 +121,7 @@ public abstract class SimAbstractMachine(
}
isTerminated = true
- cancel()
+ _ctx?.close()
}
override fun onConverge(now: Long, delta: Long) {
@@ -121,26 +129,14 @@ public abstract class SimAbstractMachine(
}
/**
- * Cancel the workload that is currently running on the machine.
- */
- private fun cancel() {
- engine.batch {
- for (cpu in cpus) {
- cpu.cancel()
- }
- }
-
- val cont = cont
- if (cont != null) {
- this.cont = null
- cont.resume(Unit)
- }
- }
-
- /**
* The execution context in which the workload runs.
*/
- private inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
+ private inner class Context(override val meta: Map<String, Any>, private val cont: Continuation<Unit>) : SimMachineContext {
+ /**
+ * A flag to indicate that the context has been closed.
+ */
+ private var isClosed = false
+
override val engine: FlowEngine
get() = this@SimAbstractMachine.engine
@@ -152,7 +148,21 @@ public abstract class SimAbstractMachine(
override val storage: List<SimStorageInterface> = this@SimAbstractMachine.storage
- override fun close() = cancel()
+ override fun close() {
+ if (isClosed) {
+ return
+ }
+
+ isClosed = true
+ engine.batch {
+ for (cpu in cpus) {
+ cpu.cancel()
+ }
+ }
+
+ onStop(this)
+ cont.resume(Unit)
+ }
}
/**
@@ -166,7 +176,7 @@ public abstract class SimAbstractMachine(
* The [SimNetworkAdapter] implementation for a machine.
*/
private class NetworkAdapterImpl(
- private val engine: FlowEngine,
+ engine: FlowEngine,
model: NetworkAdapter,
index: Int
) : SimNetworkAdapter(), SimNetworkInterface {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index f6d8f628..90bf5e25 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -29,6 +29,7 @@ import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.interference.InterferenceKey
import org.opendc.simulator.flow.mux.FlowMultiplexer
import kotlin.math.roundToLong
@@ -141,11 +142,14 @@ public abstract class SimAbstractHypervisor(
*
* @param model The machine model of the virtual machine.
*/
- private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine {
+ private inner class VirtualMachine(
+ model: MachineModel,
+ private val interferenceId: String? = null
+ ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine {
/**
* The interference key of this virtual machine.
*/
- private val interferenceKey = interferenceId?.let { interferenceDomain?.join(interferenceId) }
+ private var interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) }
/**
* The vCPUs of the machine.
@@ -187,6 +191,24 @@ public abstract class SimAbstractHypervisor(
override val cpuUsage: Double
get() = cpus.sumOf(FlowConsumer::rate)
+ override fun onStart(ctx: SimMachineContext) {
+ val interferenceKey = interferenceKey
+ if (interferenceKey != null) {
+ interferenceDomain?.join(interferenceKey)
+ }
+
+ super.onStart(ctx)
+ }
+
+ override fun onStop(ctx: SimMachineContext) {
+ super.onStop(ctx)
+
+ val interferenceKey = interferenceKey
+ if (interferenceKey != null) {
+ interferenceDomain?.leave(interferenceKey)
+ }
+ }
+
override fun close() {
super.close()
@@ -195,8 +217,10 @@ public abstract class SimAbstractHypervisor(
}
_vms.remove(this)
+
+ val interferenceKey = interferenceKey
if (interferenceKey != null) {
- interferenceDomain?.leave(interferenceKey)
+ interferenceDomain?.removeKey(interferenceKey)
}
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
index b737d61a..09b03306 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
@@ -30,14 +30,30 @@ import org.opendc.simulator.flow.interference.InterferenceKey
*/
public interface VmInterferenceDomain : InterferenceDomain {
/**
- * Join this interference domain.
+ * Construct an [InterferenceKey] for the specified [id].
*
* @param id The identifier of the virtual machine.
+ * @return A key identifying the virtual machine as part of the interference domain. `null` if the virtual machine
+ * does not participate in the domain.
*/
- public fun join(id: String): InterferenceKey
+ public fun createKey(id: String): InterferenceKey?
/**
- * Leave this interference domain.
+ * Remove the specified [key] from this domain.
+ */
+ public fun removeKey(key: InterferenceKey)
+
+ /**
+ * Mark the specified [key] as active in this interference domain.
+ *
+ * @param key The key to join the interference domain with.
+ */
+ public fun join(key: InterferenceKey)
+
+ /**
+ * Mark the specified [key] as inactive in this interference domain.
+ *
+ * @param key The key of the virtual machine that wants to leave the domain.
*/
public fun leave(key: InterferenceKey)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
index b3d72507..977292be 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
@@ -28,143 +28,366 @@ import java.util.*
/**
* An interference model that models the resource interference between virtual machines on a host.
*
- * @param groups The groups of virtual machines that interfere with each other.
- * @param random The [Random] instance to select the affected virtual machines.
+ * @param targets The target load of each group.
+ * @param scores The performance score of each group.
+ * @param members The members belonging to each group.
+ * @param membership The identifier of each key.
+ * @param size The number of groups.
+ * @param seed The seed to use for randomly selecting the virtual machines that are affected.
*/
-public class VmInterferenceModel(
- private val groups: List<VmInterferenceGroup>,
- private val random: Random = Random(0)
+public class VmInterferenceModel private constructor(
+ private val targets: DoubleArray,
+ private val scores: DoubleArray,
+ private val idMapping: Map<String, Int>,
+ private val members: Array<IntArray>,
+ private val membership: Array<IntArray>,
+ private val size: Int,
+ seed: Long,
) {
/**
+ * A [SplittableRandom] used for selecting the virtual machines that are affected.
+ */
+ private val random = SplittableRandom(seed)
+
+ /**
* Construct a new [VmInterferenceDomain].
*/
- public fun newDomain(): VmInterferenceDomain = object : VmInterferenceDomain {
+ public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(targets, scores, idMapping, members, membership, random)
+
+ /**
+ * Create a copy of this model with a different seed.
+ */
+ public fun withSeed(seed: Long): VmInterferenceModel {
+ return VmInterferenceModel(targets, scores, idMapping, members, membership, size, seed)
+ }
+
+ public companion object {
/**
- * The stateful groups of this domain.
+ * Construct a [Builder] instance.
*/
- private val groups = this@VmInterferenceModel.groups.map { GroupContext(it) }
+ @JvmStatic
+ public fun builder(): Builder = Builder()
+ }
+ /**
+ * Builder class for a [VmInterferenceModel]
+ */
+ public class Builder internal constructor() {
/**
- * The set of keys active in this domain.
+ * The initial capacity of the builder.
*/
- private val keys = mutableSetOf<InterferenceKeyImpl>()
+ private val INITIAL_CAPACITY = 256
- override fun join(id: String): InterferenceKey {
- val key = InterferenceKeyImpl(id, groups.filter { id in it }.sortedBy { it.group.targetLoad })
- keys += key
- return key
- }
+ /**
+ * The target load of each group.
+ */
+ private var _targets = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY }
- override fun leave(key: InterferenceKey) {
- if (key is InterferenceKeyImpl) {
- keys -= key
- key.leave()
+ /**
+ * The performance score of each group.
+ */
+ private var _scores = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY }
+
+ /**
+ * The members of each group.
+ */
+ private var _members = ArrayList<Set<String>>(INITIAL_CAPACITY)
+
+ /**
+ * The mapping from member to group id.
+ */
+ private val ids = TreeSet<String>()
+
+ /**
+ * The number of groups in the model.
+ */
+ private var size = 0
+
+ /**
+ * Add the specified group to the model.
+ */
+ public fun addGroup(members: Set<String>, targetLoad: Double, score: Double): Builder {
+ val size = size
+
+ if (size == _targets.size) {
+ grow()
}
+
+ _targets[size] = targetLoad
+ _scores[size] = score
+ _members.add(members)
+ ids.addAll(members)
+
+ this.size++
+
+ return this
}
- override fun apply(key: InterferenceKey?, load: Double): Double {
- if (key == null || key !is InterferenceKeyImpl) {
- return 1.0
- }
+ /**
+ * Build the [VmInterferenceModel].
+ */
+ public fun build(seed: Long = 0): VmInterferenceModel {
+ val size = size
+ val targets = _targets
+ val scores = _scores
+ val members = _members
- val ctx = key.findGroup(load)
- val group = ctx?.group
+ val indices = Array(size) { it }
+ indices.sortWith(
+ Comparator { l, r ->
+ var cmp = targets[l].compareTo(targets[r]) // Order by target load
+ if (cmp != 0) {
+ return@Comparator cmp
+ }
- // Apply performance penalty to (on average) only one of the VMs
- return if (group != null && random.nextInt(group.members.size) == 0) {
- group.score
- } else {
- 1.0
+ cmp = scores[l].compareTo(scores[r]) // Higher penalty first (this means lower performance score first)
+ if (cmp != 0)
+ cmp
+ else
+ l.compareTo(r)
+ }
+ )
+
+ val newTargets = DoubleArray(size)
+ val newScores = DoubleArray(size)
+ val newMembers = arrayOfNulls<IntArray>(size)
+
+ var nextId = 0
+ val idMapping = ids.associateWith { nextId++ }
+ val membership = ids.associateWithTo(TreeMap()) { ArrayList<Int>() }
+
+ for ((group, j) in indices.withIndex()) {
+ newTargets[group] = targets[j]
+ newScores[group] = scores[j]
+ val groupMembers = members[j]
+ val newGroupMembers = groupMembers.map { idMapping.getValue(it) }.toIntArray()
+
+ newGroupMembers.sort()
+ newMembers[group] = newGroupMembers
+
+ for (member in groupMembers) {
+ membership.getValue(member).add(group)
+ }
}
+
+ @Suppress("UNCHECKED_CAST")
+ return VmInterferenceModel(
+ newTargets,
+ newScores,
+ idMapping,
+ newMembers as Array<IntArray>,
+ membership.map { it.value.toIntArray() }.toTypedArray(),
+ size,
+ seed
+ )
}
- override fun toString(): String = "VmInterferenceDomain"
+ /**
+ * Helper function to grow the capacity of the internal arrays.
+ */
+ private fun grow() {
+ val oldSize = _targets.size
+ val newSize = oldSize + (oldSize shr 1)
+
+ _targets = _targets.copyOf(newSize)
+ _scores = _scores.copyOf(newSize)
+ }
}
/**
- * An interference key.
- *
- * @param id The identifier of the member.
- * @param groups The groups to which the key belongs.
+ * Internal implementation of [VmInterferenceDomain].
*/
- private inner class InterferenceKeyImpl(val id: String, private val groups: List<GroupContext>) : InterferenceKey {
- init {
- for (group in groups) {
- group.join(this)
- }
- }
+ private class InterferenceDomainImpl(
+ private val targets: DoubleArray,
+ private val scores: DoubleArray,
+ private val idMapping: Map<String, Int>,
+ private val members: Array<IntArray>,
+ private val membership: Array<IntArray>,
+ private val random: SplittableRandom
+ ) : VmInterferenceDomain {
+ /**
+ * Keys registered with this domain.
+ */
+ private val keys = HashMap<Int, InterferenceKeyImpl>()
/**
- * Find the active group that applies for the interference member.
+ * The set of keys active in this domain.
*/
- fun findGroup(load: Double): GroupContext? {
- // Find the first active group whose target load is lower than the current load
- val index = groups.binarySearchBy(load) { it.group.targetLoad }
- val target = if (index >= 0) index else -(index + 1)
+ private val activeKeys = ArrayList<InterferenceKeyImpl>()
- // Check whether there are active groups ahead of the index
- for (i in target until groups.size) {
- val group = groups[i]
- if (group.group.targetLoad > load) {
- break
- } else if (group.isActive) {
- return group
+ override fun createKey(id: String): InterferenceKey? {
+ val intId = idMapping[id] ?: return null
+ return keys.computeIfAbsent(intId) { InterferenceKeyImpl(intId) }
+ }
+
+ override fun removeKey(key: InterferenceKey) {
+ if (key !is InterferenceKeyImpl) {
+ return
+ }
+
+ if (activeKeys.remove(key)) {
+ computeActiveGroups(key.id)
+ }
+
+ keys.remove(key.id)
+ }
+
+ override fun join(key: InterferenceKey) {
+ if (key !is InterferenceKeyImpl) {
+ return
+ }
+
+ if (key.acquire()) {
+ val pos = activeKeys.binarySearch(key)
+ if (pos < 0) {
+ activeKeys.add(-pos - 1, key)
}
+ computeActiveGroups(key.id)
}
+ }
+
+ override fun leave(key: InterferenceKey) {
+ if (key is InterferenceKeyImpl && key.release()) {
+ activeKeys.remove(key)
+ computeActiveGroups(key.id)
+ }
+ }
+
+ override fun apply(key: InterferenceKey?, load: Double): Double {
+ if (key == null || key !is InterferenceKeyImpl) {
+ return 1.0
+ }
+
+ val groups = key.groups
+ val groupSize = groups.size
+
+ if (groupSize == 0) {
+ return 1.0
+ }
+
+ val targets = targets
+ val scores = scores
+ var low = 0
+ var high = groups.size - 1
- // Check whether there are active groups before the index
- for (i in (target - 1) downTo 0) {
- val group = groups[i]
- if (group.isActive) {
- return group
+ var group = -1
+ var score = 1.0
+
+ // Perform binary search over the groups based on target load
+ while (low <= high) {
+ val mid = low + high ushr 1
+ val midGroup = groups[mid]
+ val target = targets[midGroup]
+
+ if (target < load) {
+ low = mid + 1
+ group = midGroup
+ score = scores[midGroup]
+ } else if (target > load) {
+ high = mid - 1
+ } else {
+ group = midGroup
+ score = scores[midGroup]
+ break
}
}
- return null
+ return if (group >= 0 && random.nextInt(members[group].size) == 0) {
+ score
+ } else {
+ 1.0
+ }
}
+ override fun toString(): String = "VmInterferenceDomain"
+
/**
- * Leave all the groups.
+ * Queue of participants that will be removed or added to the active groups.
*/
- fun leave() {
+ private val _participants = ArrayDeque<InterferenceKeyImpl>()
+
+ /**
+ * (Re-)Compute the active groups.
+ */
+ private fun computeActiveGroups(id: Int) {
+ val activeKeys = activeKeys
+ val groups = membership[id]
+
+ if (activeKeys.isEmpty()) {
+ return
+ }
+
+ val members = members
+ val participants = _participants
+
for (group in groups) {
- group.leave(this)
+ val groupMembers = members[group]
+
+ var i = 0
+ var j = 0
+ var intersection = 0
+
+ // Compute the intersection of the group members and the current active members
+ while (i < groupMembers.size && j < activeKeys.size) {
+ val l = groupMembers[i]
+ val rightEntry = activeKeys[j]
+ val r = rightEntry.id
+
+ if (l < r) {
+ i++
+ } else if (l > r) {
+ j++
+ } else {
+ participants.add(rightEntry)
+ intersection++
+
+ i++
+ j++
+ }
+ }
+
+ while (true) {
+ val participant = participants.poll() ?: break
+ val participantGroups = participant.groups
+ if (intersection <= 1) {
+ participantGroups.remove(group)
+ } else {
+ val pos = participantGroups.binarySearch(group)
+ if (pos < 0) {
+ participantGroups.add(-pos - 1, group)
+ }
+ }
+ }
}
}
}
/**
- * A group context is used to track the active keys per interference group.
+ * An interference key.
+ *
+ * @param id The identifier of the member.
*/
- private inner class GroupContext(val group: VmInterferenceGroup) {
+ private class InterferenceKeyImpl(@JvmField val id: Int) : InterferenceKey, Comparable<InterferenceKeyImpl> {
/**
- * The active keys that are part of this group.
+ * The active groups to which the key belongs.
*/
- private val keys = mutableSetOf<InterferenceKeyImpl>()
+ @JvmField val groups: MutableList<Int> = ArrayList()
/**
- * A flag to indicate that the group is active.
+ * The number of users of the interference key.
*/
- val isActive
- get() = keys.size > 1
+ private var refCount: Int = 0
/**
- * Determine whether the specified [id] is part of this group.
+ * Join the domain.
*/
- operator fun contains(id: String): Boolean = id in group.members
+ fun acquire(): Boolean = refCount++ <= 0
/**
- * Join this group with the specified [key].
+ * Leave the domain.
*/
- fun join(key: InterferenceKeyImpl) {
- keys += key
- }
+ fun release(): Boolean = --refCount <= 0
- /**
- * Leave this group with the specified [key].
- */
- fun leave(key: InterferenceKeyImpl) {
- keys -= key
- }
+ override fun compareTo(other: InterferenceKeyImpl): Int = id.compareTo(other.id)
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
index 6f32cf46..b7f5bf8e 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
@@ -30,7 +30,6 @@ import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertDoesNotThrow
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -187,12 +186,11 @@ internal class SimFairShareHypervisorTest {
memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
- val groups = listOf(
- VmInterferenceGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")),
- VmInterferenceGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")),
- VmInterferenceGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n"))
- )
- val interferenceModel = VmInterferenceModel(groups)
+ val interferenceModel = VmInterferenceModel.builder()
+ .addGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b"))
+ .addGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c"))
+ .addGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n"))
+ .build()
val platform = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 59308e11..a1bc869e 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -32,7 +32,7 @@ import org.opendc.compute.workload.*
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.util.PerformanceInterferenceReader
+import org.opendc.compute.workload.util.VmInterferenceModelReader
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -131,7 +131,7 @@ class RunnerCli : CliktCommand(name = "runner") {
logger.info { "Constructing performance interference model" }
val workloadLoader = ComputeWorkloadLoader(tracePath)
- val interferenceGroups = let {
+ val interferenceModel = let {
val path = tracePath.resolve(scenario.trace.traceId).resolve("performance-interference-model.json")
val operational = scenario.operationalPhenomena
val enabled = operational.performanceInterferenceEnabled
@@ -140,15 +140,14 @@ class RunnerCli : CliktCommand(name = "runner") {
return@let null
}
- PerformanceInterferenceReader().read(path.inputStream())
+ VmInterferenceModelReader().read(path.inputStream())
}
val targets = portfolio.targets
val results = (0 until targets.repeatsPerScenario).map { repeat ->
logger.info { "Starting repeat $repeat" }
withTimeout(runTimeout * 1000) {
- val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) }
- runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel)
+ runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel?.withSeed(repeat.toLong()))
}
}