diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-15 14:43:43 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-25 17:58:53 +0200 |
| commit | e76bebe9e81c3813422da6d67fbab7d9f471a317 (patch) | |
| tree | 059a9f2cb2c5e465ac7faacf9fe74d38c96af228 | |
| parent | a1be58f1013697223a339a6a49302e1e42a6662d (diff) | |
perf(compute): Redesign VM interference algorithm
This change redesigns the virtual machine interference algorithm to have
a fixed memory usage per `VmInterferenceModel` instance. Previously, for
every interference domain, a copy of the model would be created, leading
to OutOfMemory errors when running multiple experiments at the same
time.
12 files changed, 552 insertions, 277 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt deleted file mode 100644 index 67f9626c..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.util - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup -import java.io.File -import java.io.InputStream - -/** - * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. - */ -public class PerformanceInterferenceReader { - /** - * The [ObjectMapper] to use. - */ - private val mapper = jacksonObjectMapper() - - init { - mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java) - } - - /** - * Read the performance interface model from [file]. - */ - public fun read(file: File): List<VmInterferenceGroup> { - return mapper.readValue(file) - } - - /** - * Read the performance interface model from the input. - */ - public fun read(input: InputStream): List<VmInterferenceGroup> { - return mapper.readValue(input) - } - - private data class GroupMixin( - @JsonProperty("minServerLoad") - val targetLoad: Double, - @JsonProperty("performanceScore") - val score: Double, - @JsonProperty("vms") - val members: Set<String>, - ) -} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt new file mode 100644 index 00000000..e0fa8904 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.util + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import java.io.File +import java.io.InputStream + +/** + * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. + */ +public class VmInterferenceModelReader { + /** + * The [ObjectMapper] to use. + */ + private val mapper = jacksonObjectMapper() + + /** + * Read the performance interface model from [file]. + */ + public fun read(file: File): VmInterferenceModel { + val builder = VmInterferenceModel.builder() + val parser = mapper.createParser(file) + parseGroups(parser, builder) + return builder.build() + } + + /** + * Read the performance interface model from the input. + */ + public fun read(input: InputStream): VmInterferenceModel { + val builder = VmInterferenceModel.builder() + val parser = mapper.createParser(input) + parseGroups(parser, builder) + return builder.build() + } + + /** + * Parse all groups in an interference JSON file. + */ + private fun parseGroups(parser: JsonParser, builder: VmInterferenceModel.Builder) { + parser.nextToken() + + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}") + } + + while (parser.nextToken() != JsonToken.END_ARRAY) { + parseGroup(parser, builder) + } + } + + /** + * Parse a group an interference JSON file. + */ + private fun parseGroup(parser: JsonParser, builder: VmInterferenceModel.Builder) { + var targetLoad = Double.POSITIVE_INFINITY + var score = 1.0 + val members = mutableSetOf<String>() + + if (!parser.isExpectedStartObjectToken) { + throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "vms" -> parseGroupMembers(parser, members) + "minServerLoad" -> targetLoad = parser.doubleValue + "performanceScore" -> score = parser.doubleValue + } + } + + builder.addGroup(members, targetLoad, score) + } + + /** + * Parse the members of a group. + */ + private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) { + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_ARRAY) { + if (parser.currentToken() != JsonToken.VALUE_STRING) { + throw JsonParseException(parser, "Expected string value for group member") + } + + val member = parser.text.removePrefix("vm__workload__").removeSuffix(".txt") + members.add(member) + } + } + + private data class Group( + @JsonProperty("minServerLoad") + val targetLoad: Double, + @JsonProperty("performanceScore") + val score: Double, + @JsonProperty("vms") + val members: Set<String>, + ) +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt deleted file mode 100644 index c79f0584..00000000 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.util - -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll - -/** - * Test suite for the [PerformanceInterferenceReader] class. - */ -class PerformanceInterferenceReaderTest { - @Test - fun testSmoke() { - val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) - val result = PerformanceInterferenceReader().read(input) - - assertAll( - { assertEquals(2, result.size) }, - { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) }, - { assertEquals(0.0, result[0].targetLoad, 0.001) }, - { assertEquals(0.8830158730158756, result[0].score, 0.001) } - ) - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt index 708ddede..1c3e7149 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt @@ -20,25 +20,18 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.kernel.interference +package org.opendc.compute.workload.util + +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow /** - * A group of virtual machines that together can interfere when operating on the same resources, causing performance - * variability. + * Test suite for the [VmInterferenceModelReader] class. */ -public data class VmInterferenceGroup( - /** - * The minimum load of the host before the interference occurs. - */ - public val targetLoad: Double, - - /** - * A score in [0, 1] representing the performance variability as a result of resource interference. - */ - public val score: Double, - - /** - * The members of this interference group. - */ - public val members: Set<String> -) +class VmInterferenceModelReaderTest { + @Test + fun testSmoke() { + val input = checkNotNull(VmInterferenceModelReader::class.java.getResourceAsStream("/perf-interference.json")) + assertDoesNotThrow { VmInterferenceModelReader().read(input) } + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 4e855f82..53c9de11 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -30,14 +30,13 @@ import org.opendc.compute.workload.createComputeScheduler import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter import org.opendc.compute.workload.grid5000 import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.PerformanceInterferenceReader +import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader @@ -99,9 +98,8 @@ abstract class Portfolio(name: String) : Experiment(name) { val seeder = Random(repeat.toLong()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) - PerformanceInterferenceReader() + VmInterferenceModelReader() .read(File(config.getString("interference-model"))) - .let { VmInterferenceModel(it, Random(seeder.nextLong())) } else null @@ -116,7 +114,7 @@ abstract class Portfolio(name: String) : Experiment(name) { clock, computeScheduler, failureModel, - performanceInterferenceModel + performanceInterferenceModel?.withSeed(repeat.toLong()) ) val exporter = ParquetComputeMetricExporter( diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 94e92c1b..56ba9cfe 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -34,9 +34,8 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.PerformanceInterferenceReader +import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.topology.clusterTopology -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics @@ -177,9 +176,9 @@ class CapelinIntegrationTest { val workload = createTestWorkload(1.0, seed) val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json")) val performanceInterferenceModel = - PerformanceInterferenceReader() + VmInterferenceModelReader() .read(perfInterferenceInput) - .let { VmInterferenceModel(it, Random(seed.toLong())) } + .withSeed(seed.toLong()) val simulator = ComputeWorkloadRunner( coroutineContext, @@ -213,7 +212,7 @@ class CapelinIntegrationTest { { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(481251, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(465088, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 60a10f20..5909d980 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -77,33 +77,41 @@ public abstract class SimAbstractMachine( private var isTerminated = false /** - * The continuation to resume when the virtual machine workload has finished. + * The current active [Context]. */ - private var cont: Continuation<Unit>? = null + private var _ctx: Context? = null + + /** + * This method is invoked when the machine is started. + */ + protected open fun onStart(ctx: SimMachineContext) {} + + /** + * This method is invoked when the machine is stopped. + */ + protected open fun onStop(ctx: SimMachineContext) { + _ctx = null + } /** * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { check(!isTerminated) { "Machine is terminated" } - check(cont == null) { "A machine cannot run concurrently" } - - val ctx = Context(meta) + check(_ctx == null) { "A machine cannot run concurrently" } return suspendCancellableCoroutine { cont -> - this.cont = cont + val ctx = Context(meta, cont) + _ctx = ctx // Cancel all cpus on cancellation - cont.invokeOnCancellation { - this.cont = null - engine.batch { - for (cpu in cpus) { - cpu.cancel() - } - } - } + cont.invokeOnCancellation { ctx.close() } - engine.batch { workload.onStart(ctx) } + engine.batch { + onStart(ctx) + + workload.onStart(ctx) + } } } @@ -113,7 +121,7 @@ public abstract class SimAbstractMachine( } isTerminated = true - cancel() + _ctx?.close() } override fun onConverge(now: Long, delta: Long) { @@ -121,26 +129,14 @@ public abstract class SimAbstractMachine( } /** - * Cancel the workload that is currently running on the machine. - */ - private fun cancel() { - engine.batch { - for (cpu in cpus) { - cpu.cancel() - } - } - - val cont = cont - if (cont != null) { - this.cont = null - cont.resume(Unit) - } - } - - /** * The execution context in which the workload runs. */ - private inner class Context(override val meta: Map<String, Any>) : SimMachineContext { + private inner class Context(override val meta: Map<String, Any>, private val cont: Continuation<Unit>) : SimMachineContext { + /** + * A flag to indicate that the context has been closed. + */ + private var isClosed = false + override val engine: FlowEngine get() = this@SimAbstractMachine.engine @@ -152,7 +148,21 @@ public abstract class SimAbstractMachine( override val storage: List<SimStorageInterface> = this@SimAbstractMachine.storage - override fun close() = cancel() + override fun close() { + if (isClosed) { + return + } + + isClosed = true + engine.batch { + for (cpu in cpus) { + cpu.cancel() + } + } + + onStop(this) + cont.resume(Unit) + } } /** @@ -166,7 +176,7 @@ public abstract class SimAbstractMachine( * The [SimNetworkAdapter] implementation for a machine. */ private class NetworkAdapterImpl( - private val engine: FlowEngine, + engine: FlowEngine, model: NetworkAdapter, index: Int ) : SimNetworkAdapter(), SimNetworkInterface { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index f6d8f628..90bf5e25 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -29,6 +29,7 @@ import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.interference.InterferenceKey import org.opendc.simulator.flow.mux.FlowMultiplexer import kotlin.math.roundToLong @@ -141,11 +142,14 @@ public abstract class SimAbstractHypervisor( * * @param model The machine model of the virtual machine. */ - private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine { + private inner class VirtualMachine( + model: MachineModel, + private val interferenceId: String? = null + ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine { /** * The interference key of this virtual machine. */ - private val interferenceKey = interferenceId?.let { interferenceDomain?.join(interferenceId) } + private var interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } /** * The vCPUs of the machine. @@ -187,6 +191,24 @@ public abstract class SimAbstractHypervisor( override val cpuUsage: Double get() = cpus.sumOf(FlowConsumer::rate) + override fun onStart(ctx: SimMachineContext) { + val interferenceKey = interferenceKey + if (interferenceKey != null) { + interferenceDomain?.join(interferenceKey) + } + + super.onStart(ctx) + } + + override fun onStop(ctx: SimMachineContext) { + super.onStop(ctx) + + val interferenceKey = interferenceKey + if (interferenceKey != null) { + interferenceDomain?.leave(interferenceKey) + } + } + override fun close() { super.close() @@ -195,8 +217,10 @@ public abstract class SimAbstractHypervisor( } _vms.remove(this) + + val interferenceKey = interferenceKey if (interferenceKey != null) { - interferenceDomain?.leave(interferenceKey) + interferenceDomain?.removeKey(interferenceKey) } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt index b737d61a..09b03306 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt @@ -30,14 +30,30 @@ import org.opendc.simulator.flow.interference.InterferenceKey */ public interface VmInterferenceDomain : InterferenceDomain { /** - * Join this interference domain. + * Construct an [InterferenceKey] for the specified [id]. * * @param id The identifier of the virtual machine. + * @return A key identifying the virtual machine as part of the interference domain. `null` if the virtual machine + * does not participate in the domain. */ - public fun join(id: String): InterferenceKey + public fun createKey(id: String): InterferenceKey? /** - * Leave this interference domain. + * Remove the specified [key] from this domain. + */ + public fun removeKey(key: InterferenceKey) + + /** + * Mark the specified [key] as active in this interference domain. + * + * @param key The key to join the interference domain with. + */ + public fun join(key: InterferenceKey) + + /** + * Mark the specified [key] as inactive in this interference domain. + * + * @param key The key of the virtual machine that wants to leave the domain. */ public fun leave(key: InterferenceKey) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt index b3d72507..977292be 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -28,143 +28,366 @@ import java.util.* /** * An interference model that models the resource interference between virtual machines on a host. * - * @param groups The groups of virtual machines that interfere with each other. - * @param random The [Random] instance to select the affected virtual machines. + * @param targets The target load of each group. + * @param scores The performance score of each group. + * @param members The members belonging to each group. + * @param membership The identifier of each key. + * @param size The number of groups. + * @param seed The seed to use for randomly selecting the virtual machines that are affected. */ -public class VmInterferenceModel( - private val groups: List<VmInterferenceGroup>, - private val random: Random = Random(0) +public class VmInterferenceModel private constructor( + private val targets: DoubleArray, + private val scores: DoubleArray, + private val idMapping: Map<String, Int>, + private val members: Array<IntArray>, + private val membership: Array<IntArray>, + private val size: Int, + seed: Long, ) { /** + * A [SplittableRandom] used for selecting the virtual machines that are affected. + */ + private val random = SplittableRandom(seed) + + /** * Construct a new [VmInterferenceDomain]. */ - public fun newDomain(): VmInterferenceDomain = object : VmInterferenceDomain { + public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(targets, scores, idMapping, members, membership, random) + + /** + * Create a copy of this model with a different seed. + */ + public fun withSeed(seed: Long): VmInterferenceModel { + return VmInterferenceModel(targets, scores, idMapping, members, membership, size, seed) + } + + public companion object { /** - * The stateful groups of this domain. + * Construct a [Builder] instance. */ - private val groups = this@VmInterferenceModel.groups.map { GroupContext(it) } + @JvmStatic + public fun builder(): Builder = Builder() + } + /** + * Builder class for a [VmInterferenceModel] + */ + public class Builder internal constructor() { /** - * The set of keys active in this domain. + * The initial capacity of the builder. */ - private val keys = mutableSetOf<InterferenceKeyImpl>() + private val INITIAL_CAPACITY = 256 - override fun join(id: String): InterferenceKey { - val key = InterferenceKeyImpl(id, groups.filter { id in it }.sortedBy { it.group.targetLoad }) - keys += key - return key - } + /** + * The target load of each group. + */ + private var _targets = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY } - override fun leave(key: InterferenceKey) { - if (key is InterferenceKeyImpl) { - keys -= key - key.leave() + /** + * The performance score of each group. + */ + private var _scores = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY } + + /** + * The members of each group. + */ + private var _members = ArrayList<Set<String>>(INITIAL_CAPACITY) + + /** + * The mapping from member to group id. + */ + private val ids = TreeSet<String>() + + /** + * The number of groups in the model. + */ + private var size = 0 + + /** + * Add the specified group to the model. + */ + public fun addGroup(members: Set<String>, targetLoad: Double, score: Double): Builder { + val size = size + + if (size == _targets.size) { + grow() } + + _targets[size] = targetLoad + _scores[size] = score + _members.add(members) + ids.addAll(members) + + this.size++ + + return this } - override fun apply(key: InterferenceKey?, load: Double): Double { - if (key == null || key !is InterferenceKeyImpl) { - return 1.0 - } + /** + * Build the [VmInterferenceModel]. + */ + public fun build(seed: Long = 0): VmInterferenceModel { + val size = size + val targets = _targets + val scores = _scores + val members = _members - val ctx = key.findGroup(load) - val group = ctx?.group + val indices = Array(size) { it } + indices.sortWith( + Comparator { l, r -> + var cmp = targets[l].compareTo(targets[r]) // Order by target load + if (cmp != 0) { + return@Comparator cmp + } - // Apply performance penalty to (on average) only one of the VMs - return if (group != null && random.nextInt(group.members.size) == 0) { - group.score - } else { - 1.0 + cmp = scores[l].compareTo(scores[r]) // Higher penalty first (this means lower performance score first) + if (cmp != 0) + cmp + else + l.compareTo(r) + } + ) + + val newTargets = DoubleArray(size) + val newScores = DoubleArray(size) + val newMembers = arrayOfNulls<IntArray>(size) + + var nextId = 0 + val idMapping = ids.associateWith { nextId++ } + val membership = ids.associateWithTo(TreeMap()) { ArrayList<Int>() } + + for ((group, j) in indices.withIndex()) { + newTargets[group] = targets[j] + newScores[group] = scores[j] + val groupMembers = members[j] + val newGroupMembers = groupMembers.map { idMapping.getValue(it) }.toIntArray() + + newGroupMembers.sort() + newMembers[group] = newGroupMembers + + for (member in groupMembers) { + membership.getValue(member).add(group) + } } + + @Suppress("UNCHECKED_CAST") + return VmInterferenceModel( + newTargets, + newScores, + idMapping, + newMembers as Array<IntArray>, + membership.map { it.value.toIntArray() }.toTypedArray(), + size, + seed + ) } - override fun toString(): String = "VmInterferenceDomain" + /** + * Helper function to grow the capacity of the internal arrays. + */ + private fun grow() { + val oldSize = _targets.size + val newSize = oldSize + (oldSize shr 1) + + _targets = _targets.copyOf(newSize) + _scores = _scores.copyOf(newSize) + } } /** - * An interference key. - * - * @param id The identifier of the member. - * @param groups The groups to which the key belongs. + * Internal implementation of [VmInterferenceDomain]. */ - private inner class InterferenceKeyImpl(val id: String, private val groups: List<GroupContext>) : InterferenceKey { - init { - for (group in groups) { - group.join(this) - } - } + private class InterferenceDomainImpl( + private val targets: DoubleArray, + private val scores: DoubleArray, + private val idMapping: Map<String, Int>, + private val members: Array<IntArray>, + private val membership: Array<IntArray>, + private val random: SplittableRandom + ) : VmInterferenceDomain { + /** + * Keys registered with this domain. + */ + private val keys = HashMap<Int, InterferenceKeyImpl>() /** - * Find the active group that applies for the interference member. + * The set of keys active in this domain. */ - fun findGroup(load: Double): GroupContext? { - // Find the first active group whose target load is lower than the current load - val index = groups.binarySearchBy(load) { it.group.targetLoad } - val target = if (index >= 0) index else -(index + 1) + private val activeKeys = ArrayList<InterferenceKeyImpl>() - // Check whether there are active groups ahead of the index - for (i in target until groups.size) { - val group = groups[i] - if (group.group.targetLoad > load) { - break - } else if (group.isActive) { - return group + override fun createKey(id: String): InterferenceKey? { + val intId = idMapping[id] ?: return null + return keys.computeIfAbsent(intId) { InterferenceKeyImpl(intId) } + } + + override fun removeKey(key: InterferenceKey) { + if (key !is InterferenceKeyImpl) { + return + } + + if (activeKeys.remove(key)) { + computeActiveGroups(key.id) + } + + keys.remove(key.id) + } + + override fun join(key: InterferenceKey) { + if (key !is InterferenceKeyImpl) { + return + } + + if (key.acquire()) { + val pos = activeKeys.binarySearch(key) + if (pos < 0) { + activeKeys.add(-pos - 1, key) } + computeActiveGroups(key.id) } + } + + override fun leave(key: InterferenceKey) { + if (key is InterferenceKeyImpl && key.release()) { + activeKeys.remove(key) + computeActiveGroups(key.id) + } + } + + override fun apply(key: InterferenceKey?, load: Double): Double { + if (key == null || key !is InterferenceKeyImpl) { + return 1.0 + } + + val groups = key.groups + val groupSize = groups.size + + if (groupSize == 0) { + return 1.0 + } + + val targets = targets + val scores = scores + var low = 0 + var high = groups.size - 1 - // Check whether there are active groups before the index - for (i in (target - 1) downTo 0) { - val group = groups[i] - if (group.isActive) { - return group + var group = -1 + var score = 1.0 + + // Perform binary search over the groups based on target load + while (low <= high) { + val mid = low + high ushr 1 + val midGroup = groups[mid] + val target = targets[midGroup] + + if (target < load) { + low = mid + 1 + group = midGroup + score = scores[midGroup] + } else if (target > load) { + high = mid - 1 + } else { + group = midGroup + score = scores[midGroup] + break } } - return null + return if (group >= 0 && random.nextInt(members[group].size) == 0) { + score + } else { + 1.0 + } } + override fun toString(): String = "VmInterferenceDomain" + /** - * Leave all the groups. + * Queue of participants that will be removed or added to the active groups. */ - fun leave() { + private val _participants = ArrayDeque<InterferenceKeyImpl>() + + /** + * (Re-)Compute the active groups. + */ + private fun computeActiveGroups(id: Int) { + val activeKeys = activeKeys + val groups = membership[id] + + if (activeKeys.isEmpty()) { + return + } + + val members = members + val participants = _participants + for (group in groups) { - group.leave(this) + val groupMembers = members[group] + + var i = 0 + var j = 0 + var intersection = 0 + + // Compute the intersection of the group members and the current active members + while (i < groupMembers.size && j < activeKeys.size) { + val l = groupMembers[i] + val rightEntry = activeKeys[j] + val r = rightEntry.id + + if (l < r) { + i++ + } else if (l > r) { + j++ + } else { + participants.add(rightEntry) + intersection++ + + i++ + j++ + } + } + + while (true) { + val participant = participants.poll() ?: break + val participantGroups = participant.groups + if (intersection <= 1) { + participantGroups.remove(group) + } else { + val pos = participantGroups.binarySearch(group) + if (pos < 0) { + participantGroups.add(-pos - 1, group) + } + } + } } } } /** - * A group context is used to track the active keys per interference group. + * An interference key. + * + * @param id The identifier of the member. */ - private inner class GroupContext(val group: VmInterferenceGroup) { + private class InterferenceKeyImpl(@JvmField val id: Int) : InterferenceKey, Comparable<InterferenceKeyImpl> { /** - * The active keys that are part of this group. + * The active groups to which the key belongs. */ - private val keys = mutableSetOf<InterferenceKeyImpl>() + @JvmField val groups: MutableList<Int> = ArrayList() /** - * A flag to indicate that the group is active. + * The number of users of the interference key. */ - val isActive - get() = keys.size > 1 + private var refCount: Int = 0 /** - * Determine whether the specified [id] is part of this group. + * Join the domain. */ - operator fun contains(id: String): Boolean = id in group.members + fun acquire(): Boolean = refCount++ <= 0 /** - * Join this group with the specified [key]. + * Leave the domain. */ - fun join(key: InterferenceKeyImpl) { - keys += key - } + fun release(): Boolean = --refCount <= 0 - /** - * Leave this group with the specified [key]. - */ - fun leave(key: InterferenceKeyImpl) { - keys -= key - } + override fun compareTo(other: InterferenceKeyImpl): Int = id.compareTo(other.id) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index 6f32cf46..b7f5bf8e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -30,7 +30,6 @@ import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertDoesNotThrow import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -187,12 +186,11 @@ internal class SimFairShareHypervisorTest { memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) - val groups = listOf( - VmInterferenceGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")), - VmInterferenceGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")), - VmInterferenceGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) - ) - val interferenceModel = VmInterferenceModel(groups) + val interferenceModel = VmInterferenceModel.builder() + .addGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")) + .addGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")) + .addGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) + .build() val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 59308e11..a1bc869e 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -32,7 +32,7 @@ import org.opendc.compute.workload.* import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.PerformanceInterferenceReader +import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -131,7 +131,7 @@ class RunnerCli : CliktCommand(name = "runner") { logger.info { "Constructing performance interference model" } val workloadLoader = ComputeWorkloadLoader(tracePath) - val interferenceGroups = let { + val interferenceModel = let { val path = tracePath.resolve(scenario.trace.traceId).resolve("performance-interference-model.json") val operational = scenario.operationalPhenomena val enabled = operational.performanceInterferenceEnabled @@ -140,15 +140,14 @@ class RunnerCli : CliktCommand(name = "runner") { return@let null } - PerformanceInterferenceReader().read(path.inputStream()) + VmInterferenceModelReader().read(path.inputStream()) } val targets = portfolio.targets val results = (0 until targets.repeatsPerScenario).map { repeat -> logger.info { "Starting repeat $repeat" } withTimeout(runTimeout * 1000) { - val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) } - runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel) + runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel?.withSeed(repeat.toLong())) } } |
