From 76c5d27618181c4ec9cf86085c65b5e69f5f9109 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 2 Oct 2020 00:38:44 +0200 Subject: Add opendc-simulator-compute module This change adds an opendc-simulator-compute module which contains interfaces related to simulating compute workloads. For future changes, we intend to decouple the simulation part from the opendc-compute module. --- simulator/opendc-simulator/build.gradle.kts | 32 --- .../opendc-simulator-compute/build.gradle.kts | 35 +++ .../simulator/compute/SimExecutionContext.kt | 155 ++++++++++++ .../org/opendc/simulator/compute/SimMachine.kt | 266 +++++++++++++++++++++ .../opendc/simulator/compute/SimMachineModel.kt | 34 +++ .../opendc/simulator/compute/model/MemoryUnit.kt | 38 +++ .../simulator/compute/model/ProcessingNode.kt | 38 +++ .../simulator/compute/model/ProcessingUnit.kt | 36 +++ .../simulator/compute/workload/SimFlopsWorkload.kt | 57 +++++ .../simulator/compute/workload/SimTraceWorkload.kt | 53 ++++ .../simulator/compute/workload/SimWorkload.kt | 38 +++ .../org/opendc/simulator/compute/SimMachineTest.kt | 83 +++++++ .../compute/workload/SimFlopsWorkloadTest.kt | 73 ++++++ .../opendc-simulator-core/build.gradle.kts | 32 +++ .../simulator/utils/DelayControllerClockAdapter.kt | 46 ++++ .../simulator/utils/DelayControllerClockAdapter.kt | 46 ---- 16 files changed, 984 insertions(+), 78 deletions(-) create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkloadTest.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-core/build.gradle.kts create mode 100644 simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/utils/DelayControllerClockAdapter.kt delete mode 100644 simulator/opendc-simulator/src/main/kotlin/org/opendc/simulator/utils/DelayControllerClockAdapter.kt (limited to 'simulator/opendc-simulator') diff --git a/simulator/opendc-simulator/build.gradle.kts b/simulator/opendc-simulator/build.gradle.kts index a740dcf3..e69de29b 100644 --- a/simulator/opendc-simulator/build.gradle.kts +++ b/simulator/opendc-simulator/build.gradle.kts @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Simulation-specific code for use in OpenDC" - -/* Build configuration */ -plugins { - `kotlin-library-convention` -} - -dependencies { - api("org.jetbrains.kotlinx:kotlinx-coroutines-test:${Library.KOTLINX_COROUTINES}") -} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts new file mode 100644 index 00000000..cd7e5706 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Library for simulation of cloud computing components" + +plugins { + `kotlin-library-convention` +} + +dependencies { + api(project(":opendc-simulator:opendc-simulator-core")) + + testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") + testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt new file mode 100644 index 00000000..5801fcd5 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.selects.SelectClause0 +import kotlinx.coroutines.selects.select +import java.time.Clock + +/** + * A simulated execution context in which a bootable image runs. This interface represents the + * firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on + * which the image runs. + */ +public interface SimExecutionContext { + /** + * The virtual clock tracking simulation time. + */ + public val clock: Clock + + /** + * The machine model of the machine that is running the image. + */ + public val machine: SimMachineModel + + /** + * Ask the processor cores to run the specified [slice] and suspend execution until the trigger condition is met as + * specified by [triggerMode]. + * + * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which + * may be zero). These changes may happen anytime during execution of this method and callers should not rely on + * the timing of this change. + * + * @param slice The representation of work to run on the processors. + * @param triggerMode The trigger condition to resume execution. + */ + public suspend fun run(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST): Unit = + select { onRun(slice, triggerMode).invoke {} } + + /** + * Ask the processors cores to run the specified [batch] of work slices and suspend execution until the trigger + * condition is met as specified by [triggerMode]. + * + * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which + * may be zero). These changes may happen anytime during execution of this method and callers should not rely on + * the timing of this change. + * + * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these + * slices with the next slice to be executed. + * + * @param batch The batch of work to run on the processors. + * @param triggerMode The trigger condition to resume execution. + * @param merge The merge function for consecutive slices in case the last slice was not completed within its + * deadline. + */ + public suspend fun run( + batch: Sequence, + triggerMode: TriggerMode = TriggerMode.FIRST, + merge: (Slice, Slice) -> Slice = { _, r -> r } + ): Unit = select { onRun(batch, triggerMode, merge).invoke {} } + + /** + * Ask the processor cores to run the specified [slice] and select when the trigger condition is met as specified + * by [triggerMode]. + * + * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which + * may be zero). These changes may happen anytime during execution of this method and callers should not rely on + * the timing of this change. + * + * @param slice The representation of work to request from the processors. + * @param triggerMode The trigger condition to resume execution. + */ + public fun onRun(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST): SelectClause0 = + onRun(sequenceOf(slice), triggerMode) + + /** + * Ask the processors cores to run the specified [batch] of work slices and select when the trigger condition is met + * as specified by [triggerMode]. + * + * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which + * may be zero). These changes may happen anytime during execution of this method and callers should not rely on + * the timing of this change. + * + * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these + * slices with the next slice to be executed. + * + * @param batch The batch of work to run on the processors. + * @param triggerMode The trigger condition to resume execution during the **last** slice. + * @param merge The merge function for consecutive slices in case the last slice was not completed within its + * deadline. + */ + public fun onRun( + batch: Sequence, + triggerMode: TriggerMode = TriggerMode.FIRST, + merge: (Slice, Slice) -> Slice = { _, r -> r } + ): SelectClause0 + + /** + * A request to the host machine for a slice of CPU time from the processor cores. + * + * Both [burst] and [limit] must be of the same size and in any other case the method will throw an + * [IllegalArgumentException]. + * + * + * @param burst The burst time to request from each of the processor cores. + * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst. + * @param deadline The instant at which this slice needs to be fulfilled. + */ + public class Slice(public val burst: LongArray, public val limit: DoubleArray, public val deadline: Long) { + init { + require(burst.size == limit.size) { "Incompatible array dimensions" } + } + } + + /** + * The modes for triggering a machine exit from the machine. + */ + public enum class TriggerMode { + /** + * A machine exit occurs when either the first processor finishes processing a **non-zero** burst or the + * deadline is reached. + */ + FIRST, + + /** + * A machine exit occurs when either the last processor finishes processing a **non-zero** burst or the deadline + * is reached. + */ + LAST, + + /** + * A machine exit occurs only when the deadline is reached. + */ + DEADLINE + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt new file mode 100644 index 00000000..df74a5f1 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -0,0 +1,266 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.intrinsics.startCoroutineCancellable +import kotlinx.coroutines.selects.SelectClause0 +import kotlinx.coroutines.selects.SelectInstance +import org.opendc.simulator.compute.workload.SimWorkload +import java.lang.Runnable +import java.time.Clock +import kotlin.coroutines.ContinuationInterceptor +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * A simulated bare-metal machine that is able to run a single workload. + * + * @param coroutineScope The [CoroutineScope] to run the simulated workload in. + * @param clock The virtual clock to track the simulation time. + * @param model The machine model to simulate. + */ +@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) +public class SimMachine( + private val coroutineScope: CoroutineScope, + private val clock: Clock, + public val model: SimMachineModel +) { + /** + * A [StateFlow] representing the CPU usage of the simulated machine. + */ + public val usage: StateFlow + get() = usageState + + /** + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + */ + public suspend fun run(workload: SimWorkload) { + workload.run(ctx) + } + + /** + * The execution context in which the workload runs. + */ + private val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = this@SimMachine.model + + override val clock: Clock + get() = this@SimMachine.clock + + override fun onRun( + batch: Sequence, + triggerMode: SimExecutionContext.TriggerMode, + merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice + ): SelectClause0 { + return object : SelectClause0 { + @InternalCoroutinesApi + override fun registerSelectClause0(select: SelectInstance, block: suspend () -> R) { + // Do not reset the usage state: we will set it ourselves + usageFlush?.dispose() + usageFlush = null + + val queue = batch.iterator() + var start = Long.MIN_VALUE + var currentWork: SliceWork? = null + var currentDisposable: DisposableHandle? = null + + fun schedule(slice: SimExecutionContext.Slice) { + start = clock.millis() + + val isLastSlice = !queue.hasNext() + val work = SliceWork(slice) + val candidateDuration = when (triggerMode) { + SimExecutionContext.TriggerMode.FIRST -> work.minExit + SimExecutionContext.TriggerMode.LAST -> work.maxExit + SimExecutionContext.TriggerMode.DEADLINE -> slice.deadline - start + } + + // Check whether the deadline is exceeded during the run of the slice. + val duration = min(candidateDuration, slice.deadline - start) + + val action = Runnable { + currentWork = null + + // Flush all the work that was performed + val hasFinished = work.stop(duration) + + if (!isLastSlice) { + val candidateSlice = queue.next() + val nextSlice = + // If our previous slice exceeds its deadline, merge it with the next candidate slice + if (hasFinished) + candidateSlice + else + merge(candidateSlice, slice) + schedule(nextSlice) + } else if (select.trySelect()) { + block.startCoroutineCancellable(select.completion) + } + } + + // Schedule the flush after the entire slice has finished + currentDisposable = delay.invokeOnTimeout(duration, action) + + // Start the slice work + currentWork = work + work.start() + } + + // Schedule the first work + if (queue.hasNext()) { + schedule(queue.next()) + + // A DisposableHandle to flush the work in case the call is cancelled + val disposable = DisposableHandle { + val end = clock.millis() + val duration = end - start + + currentWork?.stop(duration) + currentDisposable?.dispose() + + // Schedule reset the usage of the machine since the call is returning + usageFlush = delay.invokeOnTimeout(1) { + usageState.value = 0.0 + usageFlush = null + } + } + + select.disposeOnSelect(disposable) + } else if (select.trySelect()) { + // No work has been given: select immediately + block.startCoroutineCancellable(select.completion) + } + } + } + } + } + + /** + * The [MutableStateFlow] containing the load of the server. + */ + private val usageState = MutableStateFlow(0.0) + + /** + * A disposable to prevent resetting the usage state for subsequent calls to onRun. + */ + private var usageFlush: DisposableHandle? = null + + /** + * Cache the [Delay] instance for timing. + * + * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy. + * XXX Note however that this is an ugly hack which may break in the future. + */ + @OptIn(InternalCoroutinesApi::class) + private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay + + /** + * A slice to be processed. + */ + private inner class SliceWork(val slice: SimExecutionContext.Slice) { + /** + * The duration after which the first processor finishes processing this slice. + */ + val minExit: Long + + /** + * The duration after which the last processor finishes processing this slice. + */ + val maxExit: Long + + /** + * A flag to indicate that the slice will exceed the deadline. + */ + val exceedsDeadline: Boolean + get() = slice.deadline < maxExit + + /** + * The total amount of CPU usage. + */ + val totalUsage: Double + + /** + * A flag to indicate that this slice is empty. + */ + val isEmpty: Boolean + + init { + var totalUsage = 0.0 + var minExit = Long.MAX_VALUE + var maxExit = 0L + var nonEmpty = false + + // Determine the duration of the first/last CPU to finish + for (i in 0 until min(model.cpus.size, slice.burst.size)) { + val cpu = model.cpus[i] + val usage = min(slice.limit[i], cpu.frequency) + val cpuDuration = ceil(slice.burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds + + totalUsage += usage / cpu.frequency + + if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst + minExit = min(minExit, cpuDuration) + maxExit = max(maxExit, cpuDuration) + nonEmpty = true + } + } + + this.isEmpty = !nonEmpty + this.totalUsage = totalUsage + this.minExit = minExit + this.maxExit = maxExit + } + + /** + * Indicate that the work on the slice has started. + */ + fun start() { + usageState.value = totalUsage / model.cpus.size + } + + /** + * Flush the work performed on the slice. + */ + fun stop(duration: Long): Boolean { + var hasFinished = true + + for (i in 0 until min(model.cpus.size, slice.burst.size)) { + val usage = min(slice.limit[i], model.cpus[i].frequency) + val granted = ceil(duration / 1000.0 * usage).toLong() + val res = max(0, slice.burst[i] - granted) + slice.burst[i] = res + + if (res != 0L) { + hasFinished = false + } + } + + return hasFinished + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt new file mode 100644 index 00000000..c2988b11 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingUnit + +/** + * A description of the physical or virtual machine on which a bootable image runs. + * + * @property cpus The list of processing units available to the image. + * @property memory The list of memory units available to the image. + */ +public data class SimMachineModel(public val cpus: List, public val memory: List) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt new file mode 100644 index 00000000..bcbde5b1 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.model + +/** + * A memory unit of a compute resource, either virtual or physical. + * + * @property vendor The vendor string of the memory. + * @property modelName The name of the memory model. + * @property speed The access speed of the memory in MHz. + * @property size The size of the memory unit in MBs. + */ +public data class MemoryUnit( + public val vendor: String, + public val modelName: String, + public val speed: Double, + public val size: Long +) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt new file mode 100644 index 00000000..58ed816c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.model + +/** + * A processing node/package/socket containing possibly several CPU cores. + * + * @property vendor The vendor string of the processor node. + * @property modelName The name of the processor node. + * @property arch The micro-architecture of the processor node. + * @property coreCount The number of logical CPUs in the processor node. + */ +public data class ProcessingNode( + public val vendor: String, + public val arch: String, + public val modelName: String, + public val coreCount: Int +) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt new file mode 100644 index 00000000..415e95e6 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.model + +/** + * A single logical compute unit of processor node, either virtual or physical. + * + * @property node The processing node containing the CPU core. + * @property id The identifier of the CPU core within the processing node. + * @property frequency The clock rate of the CPU in MHz. + */ +public data class ProcessingUnit( + public val node: ProcessingNode, + public val id: Int, + public val frequency: Double +) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt new file mode 100644 index 00000000..918a78bd --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +import org.opendc.simulator.compute.SimExecutionContext +import kotlin.math.min + +/** + * A [SimWorkload] that models applications performing a static number of floating point operations ([flops]) on + * a compute resource. + * + * @property flops The number of floating point operations to perform for this task in MFLOPs. + * @property cores The number of cores that the image is able to utilize. + * @property utilization A model of the CPU utilization of the application. + */ +public class SimFlopsWorkload( + public val flops: Long, + public val cores: Int, + public val utilization: Double = 0.8 +) : SimWorkload { + init { + require(flops >= 0) { "Negative number of flops" } + require(cores > 0) { "Negative number of cores or no cores" } + require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } + } + + /** + * Execute the runtime behavior based on a number of floating point operations to execute. + */ + override suspend fun run(ctx: SimExecutionContext) { + val cores = min(this.cores, ctx.machine.cpus.size) + val burst = LongArray(cores) { flops / cores } + val maxUsage = DoubleArray(cores) { i -> ctx.machine.cpus[i].frequency * utilization } + + ctx.run(SimExecutionContext.Slice(burst, maxUsage, Long.MAX_VALUE), triggerMode = SimExecutionContext.TriggerMode.LAST) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt new file mode 100644 index 00000000..e7eaa8bf --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +import org.opendc.simulator.compute.SimExecutionContext +import kotlin.math.min + +/** + * A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource + * consumption for some period of time. + */ +public class SimTraceWorkload(private val trace: Sequence) : SimWorkload { + override suspend fun run(ctx: SimExecutionContext) { + var offset = ctx.clock.millis() + + val batch = trace.map { fragment -> + val cores = min(fragment.cores, ctx.machine.cpus.size) + val burst = LongArray(cores) { fragment.flops / cores } + val usage = DoubleArray(cores) { fragment.usage / cores } + offset += fragment.duration + SimExecutionContext.Slice(burst, usage, offset) + } + + ctx.run(batch) + } + + override fun toString(): String = "SimTraceWorkload" + + /** + * A fragment of the workload. + */ + public data class Fragment(val time: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int) +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt new file mode 100644 index 00000000..0ef4130e --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +import org.opendc.simulator.compute.SimExecutionContext + +/** + * A model that characterizes the runtime behavior of some particular workload. + */ +public interface SimWorkload { + /** + * Launch the workload in the specified [SimExecutionContext]. + * + * This method should encapsulate and characterize the runtime behavior of the instance resulting from launching + * the workload on some machine, in terms of the resource consumption on the machine. + */ + public suspend fun run(ctx: SimExecutionContext) +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt new file mode 100644 index 00000000..f6fb8e04 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.* +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.simulator.utils.DelayControllerClockAdapter + +/** + * Test suite for the [SimMachine] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +class SimMachineTest { + private lateinit var machineModel: SimMachineModel + + @BeforeEach + fun setUp() { + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + + machineModel = SimMachineModel( + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + } + + @Test + fun testFlopsWorkload() { + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) + val machine = SimMachine(testScope, clock, machineModel) + + testScope.runBlockingTest { + machine.run(SimFlopsWorkload(2_000, 2, utilization = 1.0)) + + // Two cores execute 1000 MFlOps per second (1000 ms) + assertEquals(1000, testScope.currentTime) + } + } + + @Test + fun testUsage() { + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) + val machine = SimMachine(testScope, clock, machineModel) + + testScope.runBlockingTest { + machine.run(SimFlopsWorkload(2_000, 2, utilization = 1.0)) + assertEquals(1.0, machine.usage.value) + + // Wait for the usage to reset + delay(1) + assertEquals(0.0, machine.usage.value) + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkloadTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkloadTest.kt new file mode 100644 index 00000000..51bed76c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkloadTest.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows + +/** + * Test suite for [SimFlopsWorkload] class. + */ +class SimFlopsWorkloadTest { + @Test + fun testFlopsNonNegative() { + assertThrows("FLOPs must be non-negative") { + SimFlopsWorkload(-1, 1) + } + } + + @Test + fun testCoresNonZero() { + assertThrows("Cores cannot be zero") { + SimFlopsWorkload(1, 0) + } + } + + @Test + fun testCoresPositive() { + assertThrows("Cores cannot be negative") { + SimFlopsWorkload(1, -1) + } + } + + @Test + fun testUtilizationNonZero() { + assertThrows("Utilization cannot be zero") { + SimFlopsWorkload(1, 1, 0.0) + } + } + + @Test + fun testUtilizationPositive() { + assertThrows("Utilization cannot be negative") { + SimFlopsWorkload(1, 1, -1.0) + } + } + + @Test + fun testUtilizationNotLargerThanOne() { + assertThrows("Utilization cannot be larger than one") { + SimFlopsWorkload(1, 1, 2.0) + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-core/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-core/build.gradle.kts new file mode 100644 index 00000000..a740dcf3 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-core/build.gradle.kts @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Simulation-specific code for use in OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-convention` +} + +dependencies { + api("org.jetbrains.kotlinx:kotlinx-coroutines-test:${Library.KOTLINX_COROUTINES}") +} diff --git a/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/utils/DelayControllerClockAdapter.kt b/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/utils/DelayControllerClockAdapter.kt new file mode 100644 index 00000000..84c18e87 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/utils/DelayControllerClockAdapter.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.utils + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.DelayController +import java.time.Clock +import java.time.Instant +import java.time.ZoneId + +/** + * A virtual [Clock] that abstracts accesses to [DelayController]'s virtual clock. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public class DelayControllerClockAdapter( + private val delayController: DelayController, + private val zone: ZoneId = ZoneId.systemDefault() +) : Clock() { + override fun getZone(): ZoneId = zone + + override fun withZone(zone: ZoneId): Clock = DelayControllerClockAdapter(delayController, zone) + + override fun instant(): Instant = Instant.ofEpochMilli(millis()) + + override fun millis(): Long = delayController.currentTime +} diff --git a/simulator/opendc-simulator/src/main/kotlin/org/opendc/simulator/utils/DelayControllerClockAdapter.kt b/simulator/opendc-simulator/src/main/kotlin/org/opendc/simulator/utils/DelayControllerClockAdapter.kt deleted file mode 100644 index 84c18e87..00000000 --- a/simulator/opendc-simulator/src/main/kotlin/org/opendc/simulator/utils/DelayControllerClockAdapter.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.utils - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.DelayController -import java.time.Clock -import java.time.Instant -import java.time.ZoneId - -/** - * A virtual [Clock] that abstracts accesses to [DelayController]'s virtual clock. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public class DelayControllerClockAdapter( - private val delayController: DelayController, - private val zone: ZoneId = ZoneId.systemDefault() -) : Clock() { - override fun getZone(): ZoneId = zone - - override fun withZone(zone: ZoneId): Clock = DelayControllerClockAdapter(delayController, zone) - - override fun instant(): Instant = Instant.ofEpochMilli(millis()) - - override fun millis(): Long = delayController.currentTime -} -- cgit v1.2.3 From c8567a567348e13c341bf1a1ec64ed34ce25815a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 3 Oct 2020 16:29:55 +0200 Subject: Implement VirtDriver using opendc-simulator-compute module This change adds an implementation of the VirtDriver interface that uses the functionality provided by the opendc-simulator-compute module. --- .../simulator/compute/SimBareMetalMachine.kt | 281 +++++++++++ .../org/opendc/simulator/compute/SimHypervisor.kt | 529 +++++++++++++++++++++ .../org/opendc/simulator/compute/SimMachine.kt | 236 +-------- .../simulator/compute/workload/SimWorkload.kt | 3 + .../opendc/simulator/compute/SimHypervisorTest.kt | 130 +++++ .../org/opendc/simulator/compute/SimMachineTest.kt | 6 +- 6 files changed, 955 insertions(+), 230 deletions(-) create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt (limited to 'simulator/opendc-simulator') diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt new file mode 100644 index 00000000..c6d5bdd1 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -0,0 +1,281 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.intrinsics.startCoroutineCancellable +import kotlinx.coroutines.selects.SelectClause0 +import kotlinx.coroutines.selects.SelectInstance +import org.opendc.simulator.compute.workload.SimWorkload +import java.lang.Runnable +import java.time.Clock +import kotlin.coroutines.ContinuationInterceptor +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * A simulated bare-metal machine that is able to run a single workload. + * + * A [SimBareMetalMachine] is a stateful object and you should be careful when operating this object concurrently. For + * example. the class expects only a single concurrent call to [run]. + * + * @param coroutineScope The [CoroutineScope] to run the simulated workload in. + * @param clock The virtual clock to track the simulation time. + * @param model The machine model to simulate. + */ +@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) +public class SimBareMetalMachine( + private val coroutineScope: CoroutineScope, + private val clock: Clock, + override val model: SimMachineModel +) : SimMachine { + /** + * A [StateFlow] representing the CPU usage of the simulated machine. + */ + override val usage: StateFlow + get() = usageState + + /** + * The current active workload. + */ + private var activeWorkload: SimWorkload? = null + + /** + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + */ + override suspend fun run(workload: SimWorkload) { + require(activeWorkload == null) { "Run should not be called concurrently" } + + try { + activeWorkload = workload + workload.run(ctx) + } finally { + activeWorkload = null + } + } + + /** + * The execution context in which the workload runs. + */ + private val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = this@SimBareMetalMachine.model + + override val clock: Clock + get() = this@SimBareMetalMachine.clock + + override fun onRun( + batch: Sequence, + triggerMode: SimExecutionContext.TriggerMode, + merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice + ): SelectClause0 { + return object : SelectClause0 { + @InternalCoroutinesApi + override fun registerSelectClause0(select: SelectInstance, block: suspend () -> R) { + // Do not reset the usage state: we will set it ourselves + usageFlush?.dispose() + usageFlush = null + + val queue = batch.iterator() + var start = Long.MIN_VALUE + var currentWork: SliceWork? = null + var currentDisposable: DisposableHandle? = null + + fun schedule(slice: SimExecutionContext.Slice) { + start = clock.millis() + + val isLastSlice = !queue.hasNext() + val work = SliceWork(slice) + val candidateDuration = when (triggerMode) { + SimExecutionContext.TriggerMode.FIRST -> work.minExit + SimExecutionContext.TriggerMode.LAST -> work.maxExit + SimExecutionContext.TriggerMode.DEADLINE -> slice.deadline - start + } + + // Check whether the deadline is exceeded during the run of the slice. + val duration = min(candidateDuration, slice.deadline - start) + + val action = Runnable { + currentWork = null + + // Flush all the work that was performed + val hasFinished = work.stop(duration) + + if (!isLastSlice) { + val candidateSlice = queue.next() + val nextSlice = + // If our previous slice exceeds its deadline, merge it with the next candidate slice + if (hasFinished) + candidateSlice + else + merge(candidateSlice, slice) + schedule(nextSlice) + } else if (select.trySelect()) { + block.startCoroutineCancellable(select.completion) + } + } + + // Schedule the flush after the entire slice has finished + currentDisposable = delay.invokeOnTimeout(duration, action) + + // Start the slice work + currentWork = work + work.start() + } + + // Schedule the first work + if (queue.hasNext()) { + schedule(queue.next()) + + // A DisposableHandle to flush the work in case the call is cancelled + val disposable = DisposableHandle { + val end = clock.millis() + val duration = end - start + + currentWork?.stop(duration) + currentDisposable?.dispose() + + // Schedule reset the usage of the machine since the call is returning + usageFlush = delay.invokeOnTimeout(1) { + usageState.value = 0.0 + usageFlush = null + } + } + + select.disposeOnSelect(disposable) + } else if (select.trySelect()) { + // No work has been given: select immediately + block.startCoroutineCancellable(select.completion) + } + } + } + } + } + + /** + * The [MutableStateFlow] containing the load of the server. + */ + private val usageState = MutableStateFlow(0.0) + + /** + * A disposable to prevent resetting the usage state for subsequent calls to onRun. + */ + private var usageFlush: DisposableHandle? = null + + /** + * Cache the [Delay] instance for timing. + * + * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy. + * XXX Note however that this is an ugly hack which may break in the future. + */ + @OptIn(InternalCoroutinesApi::class) + private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay + + /** + * A slice to be processed. + */ + private inner class SliceWork(val slice: SimExecutionContext.Slice) { + /** + * The duration after which the first processor finishes processing this slice. + */ + val minExit: Long + + /** + * The duration after which the last processor finishes processing this slice. + */ + val maxExit: Long + + /** + * A flag to indicate that the slice will exceed the deadline. + */ + val exceedsDeadline: Boolean + get() = slice.deadline < maxExit + + /** + * The total amount of CPU usage. + */ + val totalUsage: Double + + /** + * A flag to indicate that this slice is empty. + */ + val isEmpty: Boolean + + init { + var totalUsage = 0.0 + var minExit = Long.MAX_VALUE + var maxExit = 0L + var nonEmpty = false + + // Determine the duration of the first/last CPU to finish + for (i in 0 until min(model.cpus.size, slice.burst.size)) { + val cpu = model.cpus[i] + val usage = min(slice.limit[i], cpu.frequency) + val cpuDuration = ceil(slice.burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds + + totalUsage += usage / cpu.frequency + + if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst + minExit = min(minExit, cpuDuration) + maxExit = max(maxExit, cpuDuration) + nonEmpty = true + } + } + + this.isEmpty = !nonEmpty + this.totalUsage = totalUsage + this.minExit = minExit + this.maxExit = maxExit + } + + /** + * Indicate that the work on the slice has started. + */ + fun start() { + usageState.value = totalUsage / model.cpus.size + } + + /** + * Flush the work performed on the slice. + */ + fun stop(duration: Long): Boolean { + var hasFinished = true + + for (i in 0 until min(model.cpus.size, slice.burst.size)) { + val usage = min(slice.limit[i], model.cpus[i].frequency) + val granted = ceil(duration / 1000.0 * usage).toLong() + val res = max(0, slice.burst[i] - granted) + slice.burst[i] = res + + if (res != 0L) { + hasFinished = false + } + } + + return hasFinished + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt new file mode 100644 index 00000000..7c2cfbe3 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt @@ -0,0 +1,529 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.intrinsics.startCoroutineCancellable +import kotlinx.coroutines.selects.SelectClause0 +import kotlinx.coroutines.selects.SelectInstance +import kotlinx.coroutines.selects.select +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimWorkload +import java.time.Clock +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * SimHypervisor distributes the computing requirements of multiple [SimWorkload] on a single [SimBareMetalMachine] concurrently. + * + * @param coroutineScope The [CoroutineScope] to run the simulated workloads in. + * @param clock The virtual clock to track the simulation time. + */ +@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) +public class SimHypervisor( + private val coroutineScope: CoroutineScope, + private val clock: Clock, + private val listener: Listener? = null +) : SimWorkload { + /** + * A set for tracking the VM context objects. + */ + private val vms: MutableSet = mutableSetOf() + + /** + * A flag to indicate the driver is stopped. + */ + private var stopped: Boolean = false + + /** + * The channel for scheduling new CPU requests. + */ + private val schedulingQueue = Channel(Channel.UNLIMITED) + + /** + * Create a [SimMachine] instance on which users may run a [SimWorkload]. + * + * @param model The machine to create. + */ + public fun createMachine(model: SimMachineModel): SimMachine { + val vmCtx = VmExecutionContext(model) + + return object : SimMachine { + override val model: SimMachineModel + get() = vmCtx.machine + + override val usage: StateFlow + get() = vmCtx.session.usage + + /** + * The current active workload. + */ + private var activeWorkload: SimWorkload? = null + + override suspend fun run(workload: SimWorkload) { + require(activeWorkload == null) { "Run should not be called concurrently" } + + try { + activeWorkload = workload + workload.run(vmCtx) + } finally { + activeWorkload = null + } + } + + override fun toString(): String = "SimVirtualMachine" + } + } + + /** + * Run the scheduling process of the hypervisor. + */ + override suspend fun run(ctx: SimExecutionContext) { + val maxUsage = ctx.machine.cpus.sumByDouble { it.frequency } + val pCPUs = ctx.machine.cpus.indices.sortedBy { ctx.machine.cpus[it].frequency } + + val vms = mutableSetOf() + val vcpus = mutableListOf() + + val usage = DoubleArray(ctx.machine.cpus.size) + val burst = LongArray(ctx.machine.cpus.size) + + fun process(command: SchedulerCommand) { + when (command) { + is SchedulerCommand.Schedule -> { + vms += command.vm + vcpus.addAll(command.vm.vcpus) + } + is SchedulerCommand.Deschedule -> { + vms -= command.vm + vcpus.removeAll(command.vm.vcpus) + } + is SchedulerCommand.Interrupt -> { + } + } + } + + fun processRemaining() { + var command = schedulingQueue.poll() + while (command != null) { + process(command) + command = schedulingQueue.poll() + } + } + + while (!stopped) { + // Wait for a request to be submitted if we have no work yet. + if (vcpus.isEmpty()) { + process(schedulingQueue.receive()) + } + + processRemaining() + + val start = clock.millis() + + var duration: Double = Double.POSITIVE_INFINITY + var deadline: Long = Long.MAX_VALUE + var availableUsage = maxUsage + var totalRequestedUsage = 0.0 + var totalRequestedBurst = 0L + + // Sort the vCPUs based on their requested usage + // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + vcpus.sort() + + // Divide the available host capacity fairly across the vCPUs using max-min fair sharing + for ((i, req) in vcpus.withIndex()) { + val remaining = vcpus.size - i + val availableShare = availableUsage / remaining + val grantedUsage = min(req.limit, availableShare) + + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, req.vm.deadline) + + // Ignore empty CPUs + if (grantedUsage <= 0 || req.burst <= 0) { + req.allocatedLimit = 0.0 + continue + } + + totalRequestedUsage += req.limit + totalRequestedBurst += req.burst + + req.allocatedLimit = grantedUsage + availableUsage -= grantedUsage + + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, req.burst / grantedUsage) + } + + val totalAllocatedUsage = maxUsage - availableUsage + var totalAllocatedBurst = 0L + availableUsage = totalAllocatedUsage + + // Divide the requests over the available capacity of the pCPUs fairly + for (i in pCPUs) { + val maxCpuUsage = ctx.machine.cpus[i].frequency + val fraction = maxCpuUsage / maxUsage + val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction) + val grantedBurst = ceil(duration * grantedUsage).toLong() + + usage[i] = grantedUsage + burst[i] = grantedBurst + totalAllocatedBurst += grantedBurst + availableUsage -= grantedUsage + } + + // We run the total burst on the host processor. Note that this call may be cancelled at any moment in + // time, so not all of the burst may be executed. + select { + schedulingQueue.onReceive { schedulingQueue.offer(it); true } + ctx.onRun(SimExecutionContext.Slice(burst, usage, deadline), SimExecutionContext.TriggerMode.DEADLINE) + .invoke { false } + } + + val end = clock.millis() + + // No work was performed + if ((end - start) <= 0) { + continue + } + + // The total requested burst that the VMs wanted to run in the time-frame that we ran. + val totalRequestedSubBurst = + vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum() + val totalRemainder = burst.sum() + val totalGrantedBurst = totalAllocatedBurst - totalRemainder + + // The burst that was lost due to overcommissioning of CPU resources + var totalOvercommissionedBurst = 0L + // The burst that was lost due to interference. + var totalInterferedBurst = 0L + + val vmIterator = vms.iterator() + while (vmIterator.hasNext()) { + val vm = vmIterator.next() + + // Apply performance interference model + val performanceScore = 1.0 // TODO Performance interference + var hasFinished = false + + for (vcpu in vm.vcpus) { + // Compute the fraction of compute time allocated to the VM + val fraction = vcpu.allocatedLimit / totalAllocatedUsage + + // Compute the burst time that the VM was actually granted + val grantedBurst = ceil(totalGrantedBurst * fraction).toLong() + + // The burst that was actually used by the VM + val usedBurst = ceil(grantedBurst * performanceScore).toLong() + + totalInterferedBurst += grantedBurst - usedBurst + + // Compute remaining burst time to be executed for the request + if (vcpu.consume(usedBurst)) { + hasFinished = true + } else if (vm.deadline <= end) { + // Request must have its entire burst consumed or otherwise we have overcommission + // Note that we count the overcommissioned burst if the hypervisor has failed. + totalOvercommissionedBurst += vcpu.burst + } + } + + if (hasFinished || vm.deadline <= end) { + // Mark the VM as finished and deschedule the VMs if needed + if (vm.finish()) { + vmIterator.remove() + vcpus.removeAll(vm.vcpus) + } + } + } + + listener?.onSliceFinish( + this, + totalRequestedBurst, + min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing + totalOvercommissionedBurst, + totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, + min( + totalAllocatedUsage, + totalRequestedUsage + ), // The allocated usage might be slightly higher due to FP rounding + totalRequestedUsage + ) + } + } + + /** + * Event listener for hypervisor events. + */ + public interface Listener { + /** + * This method is invoked when a slice is finished. + */ + public fun onSliceFinish( + hypervisor: SimHypervisor, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double + ) + } + + /** + * A scheduling command processed by the scheduler. + */ + private sealed class SchedulerCommand { + /** + * Schedule the specified VM on the hypervisor. + */ + data class Schedule(val vm: VmSession) : SchedulerCommand() + + /** + * De-schedule the specified VM on the hypervisor. + */ + data class Deschedule(val vm: VmSession) : SchedulerCommand() + + /** + * Interrupt the scheduler. + */ + object Interrupt : SchedulerCommand() + } + + /** + * A virtual machine running on the hypervisor. + * + * @param ctx The execution context the vCPU runs in. + * @param triggerMode The mode when to trigger the VM exit. + * @param merge The function to merge consecutive slices on spillover. + * @param select The function to select on finish. + */ + @OptIn(InternalCoroutinesApi::class) + private data class VmSession( + val model: SimMachineModel, + var triggerMode: SimExecutionContext.TriggerMode = SimExecutionContext.TriggerMode.FIRST, + var merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice = { _, r -> r }, + var select: () -> Unit = {} + ) { + /** + * The vCPUs of this virtual machine. + */ + val vcpus: List + + /** + * The slices that the VM wants to run. + */ + var queue: Iterator = emptyList().iterator() + + /** + * The current active slice. + */ + var activeSlice: SimExecutionContext.Slice? = null + + /** + * The current deadline of the VM. + */ + val deadline: Long + get() = activeSlice?.deadline ?: Long.MAX_VALUE + + /** + * A flag to indicate that the VM is idle. + */ + val isIdle: Boolean + get() = activeSlice == null + + /** + * The usage of the virtual machine. + */ + val usage: MutableStateFlow = MutableStateFlow(0.0) + + init { + vcpus = model.cpus.mapIndexed { i, model -> VCpu(this, model, i) } + } + + /** + * Schedule the given slices on this vCPU, replacing the existing slices. + */ + fun schedule(slices: Sequence) { + queue = slices.iterator() + + if (queue.hasNext()) { + activeSlice = queue.next() + refresh() + } + } + + /** + * Cancel the existing workload on the VM. + */ + fun cancel() { + queue = emptyList().iterator() + activeSlice = null + refresh() + } + + /** + * Finish the current slice of the VM. + * + * @return `true` if the vCPUs may be descheduled, `false` otherwise. + */ + fun finish(): Boolean { + val activeSlice = activeSlice ?: return true + + return if (queue.hasNext()) { + val needsMerge = activeSlice.burst.any { it > 0 } + val candidateSlice = queue.next() + val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice + + this.activeSlice = slice + + // Update the vCPU cache + refresh() + + false + } else { + this.activeSlice = null + select() + true + } + } + + /** + * Refresh the vCPU cache. + */ + fun refresh() { + vcpus.forEach { it.refresh() } + usage.value = vcpus.sumByDouble { it.burst / it.limit } / vcpus.size + } + } + + /** + * A virtual CPU that can be scheduled on a physical CPU. + * + * @param vm The VM of which this vCPU is part. + * @param model The model of CPU that this vCPU models. + * @param id The id of the vCPU with respect to the VM. + */ + private data class VCpu( + val vm: VmSession, + val model: ProcessingUnit, + val id: Int + ) : Comparable { + /** + * The current limit on the vCPU. + */ + var limit: Double = 0.0 + + /** + * The limit allocated by the hypervisor. + */ + var allocatedLimit: Double = 0.0 + + /** + * The current burst running on the vCPU. + */ + var burst: Long = 0L + + /** + * Consume the specified burst on this vCPU. + */ + fun consume(burst: Long): Boolean { + this.burst = max(0, this.burst - burst) + + // Flush the result to the slice if it exists + vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst) + + return allocatedLimit > 0.0 && this.burst == 0L + } + + /** + * Refresh the information of this vCPU based on the current slice. + */ + fun refresh() { + limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0 + burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0 + } + + /** + * Compare to another vCPU based on the current load of the vCPU. + */ + override fun compareTo(other: VCpu): Int { + return limit.compareTo(other.limit) + } + + /** + * Create a string representation of the vCPU. + */ + override fun toString(): String = + "vCPU(id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)" + } + + /** + * The execution context in which a VM runs. + * + */ + private inner class VmExecutionContext(override val machine: SimMachineModel) : + SimExecutionContext, + DisposableHandle { + private var finalized: Boolean = false + private var initialized: Boolean = false + val session: VmSession = VmSession(machine) + + override val clock: Clock + get() = this@SimHypervisor.clock + + @OptIn(InternalCoroutinesApi::class) + override fun onRun( + batch: Sequence, + triggerMode: SimExecutionContext.TriggerMode, + merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice + ): SelectClause0 = object : SelectClause0 { + @InternalCoroutinesApi + override fun registerSelectClause0(select: SelectInstance, block: suspend () -> R) { + session.triggerMode = triggerMode + session.merge = merge + session.select = { + if (select.trySelect()) { + block.startCoroutineCancellable(select.completion) + } + } + session.schedule(batch) + // Indicate to the hypervisor that the VM should be re-scheduled + schedulingQueue.offer(SchedulerCommand.Schedule(session)) + select.disposeOnSelect(this@VmExecutionContext) + } + } + + override fun dispose() { + if (!session.isIdle) { + session.cancel() + schedulingQueue.offer(SchedulerCommand.Deschedule(session)) + } + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt index df74a5f1..f66085af 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -22,245 +22,27 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.intrinsics.startCoroutineCancellable -import kotlinx.coroutines.selects.SelectClause0 -import kotlinx.coroutines.selects.SelectInstance import org.opendc.simulator.compute.workload.SimWorkload -import java.lang.Runnable -import java.time.Clock -import kotlin.coroutines.ContinuationInterceptor -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min /** - * A simulated bare-metal machine that is able to run a single workload. - * - * @param coroutineScope The [CoroutineScope] to run the simulated workload in. - * @param clock The virtual clock to track the simulation time. - * @param model The machine model to simulate. + * A generic machine that is able to run a [SimWorkload]. */ -@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) -public class SimMachine( - private val coroutineScope: CoroutineScope, - private val clock: Clock, +@OptIn(ExperimentalCoroutinesApi::class) +public interface SimMachine { + /** + * The model of the machine containing its specifications. + */ public val model: SimMachineModel -) { + /** * A [StateFlow] representing the CPU usage of the simulated machine. */ public val usage: StateFlow - get() = usageState /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - public suspend fun run(workload: SimWorkload) { - workload.run(ctx) - } - - /** - * The execution context in which the workload runs. - */ - private val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = this@SimMachine.model - - override val clock: Clock - get() = this@SimMachine.clock - - override fun onRun( - batch: Sequence, - triggerMode: SimExecutionContext.TriggerMode, - merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice - ): SelectClause0 { - return object : SelectClause0 { - @InternalCoroutinesApi - override fun registerSelectClause0(select: SelectInstance, block: suspend () -> R) { - // Do not reset the usage state: we will set it ourselves - usageFlush?.dispose() - usageFlush = null - - val queue = batch.iterator() - var start = Long.MIN_VALUE - var currentWork: SliceWork? = null - var currentDisposable: DisposableHandle? = null - - fun schedule(slice: SimExecutionContext.Slice) { - start = clock.millis() - - val isLastSlice = !queue.hasNext() - val work = SliceWork(slice) - val candidateDuration = when (triggerMode) { - SimExecutionContext.TriggerMode.FIRST -> work.minExit - SimExecutionContext.TriggerMode.LAST -> work.maxExit - SimExecutionContext.TriggerMode.DEADLINE -> slice.deadline - start - } - - // Check whether the deadline is exceeded during the run of the slice. - val duration = min(candidateDuration, slice.deadline - start) - - val action = Runnable { - currentWork = null - - // Flush all the work that was performed - val hasFinished = work.stop(duration) - - if (!isLastSlice) { - val candidateSlice = queue.next() - val nextSlice = - // If our previous slice exceeds its deadline, merge it with the next candidate slice - if (hasFinished) - candidateSlice - else - merge(candidateSlice, slice) - schedule(nextSlice) - } else if (select.trySelect()) { - block.startCoroutineCancellable(select.completion) - } - } - - // Schedule the flush after the entire slice has finished - currentDisposable = delay.invokeOnTimeout(duration, action) - - // Start the slice work - currentWork = work - work.start() - } - - // Schedule the first work - if (queue.hasNext()) { - schedule(queue.next()) - - // A DisposableHandle to flush the work in case the call is cancelled - val disposable = DisposableHandle { - val end = clock.millis() - val duration = end - start - - currentWork?.stop(duration) - currentDisposable?.dispose() - - // Schedule reset the usage of the machine since the call is returning - usageFlush = delay.invokeOnTimeout(1) { - usageState.value = 0.0 - usageFlush = null - } - } - - select.disposeOnSelect(disposable) - } else if (select.trySelect()) { - // No work has been given: select immediately - block.startCoroutineCancellable(select.completion) - } - } - } - } - } - - /** - * The [MutableStateFlow] containing the load of the server. - */ - private val usageState = MutableStateFlow(0.0) - - /** - * A disposable to prevent resetting the usage state for subsequent calls to onRun. - */ - private var usageFlush: DisposableHandle? = null - - /** - * Cache the [Delay] instance for timing. - * - * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy. - * XXX Note however that this is an ugly hack which may break in the future. - */ - @OptIn(InternalCoroutinesApi::class) - private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay - - /** - * A slice to be processed. - */ - private inner class SliceWork(val slice: SimExecutionContext.Slice) { - /** - * The duration after which the first processor finishes processing this slice. - */ - val minExit: Long - - /** - * The duration after which the last processor finishes processing this slice. - */ - val maxExit: Long - - /** - * A flag to indicate that the slice will exceed the deadline. - */ - val exceedsDeadline: Boolean - get() = slice.deadline < maxExit - - /** - * The total amount of CPU usage. - */ - val totalUsage: Double - - /** - * A flag to indicate that this slice is empty. - */ - val isEmpty: Boolean - - init { - var totalUsage = 0.0 - var minExit = Long.MAX_VALUE - var maxExit = 0L - var nonEmpty = false - - // Determine the duration of the first/last CPU to finish - for (i in 0 until min(model.cpus.size, slice.burst.size)) { - val cpu = model.cpus[i] - val usage = min(slice.limit[i], cpu.frequency) - val cpuDuration = ceil(slice.burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds - - totalUsage += usage / cpu.frequency - - if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst - minExit = min(minExit, cpuDuration) - maxExit = max(maxExit, cpuDuration) - nonEmpty = true - } - } - - this.isEmpty = !nonEmpty - this.totalUsage = totalUsage - this.minExit = minExit - this.maxExit = maxExit - } - - /** - * Indicate that the work on the slice has started. - */ - fun start() { - usageState.value = totalUsage / model.cpus.size - } - - /** - * Flush the work performed on the slice. - */ - fun stop(duration: Long): Boolean { - var hasFinished = true - - for (i in 0 until min(model.cpus.size, slice.burst.size)) { - val usage = min(slice.limit[i], model.cpus[i].frequency) - val granted = ceil(duration / 1000.0 * usage).toLong() - val res = max(0, slice.burst[i] - granted) - slice.burst[i] = res - - if (res != 0L) { - hasFinished = false - } - } - - return hasFinished - } - } + public suspend fun run(workload: SimWorkload) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index 0ef4130e..2add8cce 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -26,6 +26,9 @@ import org.opendc.simulator.compute.SimExecutionContext /** * A model that characterizes the runtime behavior of some particular workload. + * + * Workloads are stateful objects that may be paused and resumed at a later moment. As such, be careful when using the + * same [SimWorkload] from multiple contexts as only a single concurrent [run] call is expected. */ public interface SimWorkload { /** diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt new file mode 100644 index 00000000..b9cd1b06 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.utils.DelayControllerClockAdapter +import java.time.Clock +import java.util.* + +/** + * Test suite for the [SimHypervisor] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimHypervisorTest { + private lateinit var scope: TestCoroutineScope + private lateinit var clock: Clock + private lateinit var machineModel: SimMachineModel + + @BeforeEach + fun setUp() { + scope = TestCoroutineScope() + clock = DelayControllerClockAdapter(scope) + + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + machineModel = SimMachineModel( + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + } + + /** + * Test overcommissioning of a hypervisor. + */ + @Test + fun overcommission() { + val listener = object : SimHypervisor.Listener { + var totalRequestedBurst = 0L + var totalGrantedBurst = 0L + var totalOvercommissionedBurst = 0L + + override fun onSliceFinish( + hypervisor: SimHypervisor, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + totalRequestedBurst += requestedBurst + totalGrantedBurst += grantedBurst + totalOvercommissionedBurst += overcommissionedBurst + } + } + + scope.launch { + val duration = 5 * 60L + val workloadA = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(0, 28L * duration, duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(0, 3500L * duration, duration * 1000, 3500.0, 2), + SimTraceWorkload.Fragment(0, 0, duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(0, 183L * duration, duration * 1000, 183.0, 2) + ), + ) + val workloadB = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(0, 28L * duration, duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(0, 3100L * duration, duration * 1000, 3100.0, 2), + SimTraceWorkload.Fragment(0, 0, duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(0, 73L * duration, duration * 1000, 73.0, 2) + ) + ) + + val machine = SimBareMetalMachine(scope, clock, machineModel) + val hypervisor = SimHypervisor(scope, clock, listener) + + launch { + machine.run(hypervisor) + } + + yield() + launch { hypervisor.createMachine(machineModel).run(workloadA) } + launch { hypervisor.createMachine(machineModel).run(workloadB) } + } + + scope.advanceUntilIdle() + + assertAll( + { Assertions.assertEquals(emptyList(), scope.uncaughtExceptions, "No errors") }, + { Assertions.assertEquals(2073600, listener.totalRequestedBurst, "Requested Burst does not match") }, + { Assertions.assertEquals(2013600, listener.totalGrantedBurst, "Granted Burst does not match") }, + { Assertions.assertEquals(60000, listener.totalOvercommissionedBurst, "Overcommissioned Burst does not match") }, + { Assertions.assertEquals(1200001, scope.currentTime) } + ) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index f6fb8e04..332ca8e9 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -35,7 +35,7 @@ import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter /** - * Test suite for the [SimMachine] class. + * Test suite for the [SimBareMetalMachine] class. */ @OptIn(ExperimentalCoroutinesApi::class) class SimMachineTest { @@ -55,7 +55,7 @@ class SimMachineTest { fun testFlopsWorkload() { val testScope = TestCoroutineScope() val clock = DelayControllerClockAdapter(testScope) - val machine = SimMachine(testScope, clock, machineModel) + val machine = SimBareMetalMachine(testScope, clock, machineModel) testScope.runBlockingTest { machine.run(SimFlopsWorkload(2_000, 2, utilization = 1.0)) @@ -69,7 +69,7 @@ class SimMachineTest { fun testUsage() { val testScope = TestCoroutineScope() val clock = DelayControllerClockAdapter(testScope) - val machine = SimMachine(testScope, clock, machineModel) + val machine = SimBareMetalMachine(testScope, clock, machineModel) testScope.runBlockingTest { machine.run(SimFlopsWorkload(2_000, 2, utilization = 1.0)) -- cgit v1.2.3 From 136c1b9ddc7bd9331d3552d681e9190fc6198271 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 3 Oct 2020 17:41:59 +0200 Subject: Migrate codebase to opendc-simulator-compute This change updates the remainder of the codebase to use the opendc-simulator-compute module for the simulation of workloads. --- .../kotlin/org/opendc/simulator/compute/SimHypervisor.kt | 14 +++++++++----- .../opendc/simulator/compute/workload/SimTraceWorkload.kt | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) (limited to 'simulator/opendc-simulator') diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt index 7c2cfbe3..55eda70f 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt @@ -103,14 +103,15 @@ public class SimHypervisor( * Run the scheduling process of the hypervisor. */ override suspend fun run(ctx: SimExecutionContext) { - val maxUsage = ctx.machine.cpus.sumByDouble { it.frequency } - val pCPUs = ctx.machine.cpus.indices.sortedBy { ctx.machine.cpus[it].frequency } + val model = ctx.machine + val maxUsage = model.cpus.sumByDouble { it.frequency } + val pCPUs = model.cpus.indices.sortedBy { model.cpus[it].frequency } val vms = mutableSetOf() val vcpus = mutableListOf() - val usage = DoubleArray(ctx.machine.cpus.size) - val burst = LongArray(ctx.machine.cpus.size) + val usage = DoubleArray(model.cpus.size) + val burst = LongArray(model.cpus.size) fun process(command: SchedulerCommand) { when (command) { @@ -180,13 +181,16 @@ public class SimHypervisor( duration = min(duration, req.burst / grantedUsage) } + // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs. + duration = 300.0 + val totalAllocatedUsage = maxUsage - availableUsage var totalAllocatedBurst = 0L availableUsage = totalAllocatedUsage // Divide the requests over the available capacity of the pCPUs fairly for (i in pCPUs) { - val maxCpuUsage = ctx.machine.cpus[i].frequency + val maxCpuUsage = model.cpus[i].frequency val fraction = maxCpuUsage / maxUsage val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction) val grantedBurst = ceil(duration * grantedUsage).toLong() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index e7eaa8bf..7b1ddf32 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -29,7 +29,7 @@ import kotlin.math.min * A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource * consumption for some period of time. */ -public class SimTraceWorkload(private val trace: Sequence) : SimWorkload { +public class SimTraceWorkload(public val trace: Sequence) : SimWorkload { override suspend fun run(ctx: SimExecutionContext) { var offset = ctx.clock.millis() -- cgit v1.2.3 From 47f803afc14a50c467d2f5f7ff406824428223f7 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 4 Oct 2020 17:17:23 +0200 Subject: Reimplement performance interference in opendc-simulator-compute This change reimplements the performance interference model originally implemented for the SimpleVirtDriver class, for SimHypervisor. --- .../org/opendc/simulator/compute/SimHypervisor.kt | 22 ++-- .../interference/PerformanceInterferenceModel.kt | 134 +++++++++++++++++++++ .../opendc/simulator/compute/SimHypervisorTest.kt | 1 - 3 files changed, 146 insertions(+), 11 deletions(-) create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt (limited to 'simulator/opendc-simulator') diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt index 55eda70f..6087227b 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt @@ -30,6 +30,7 @@ import kotlinx.coroutines.intrinsics.startCoroutineCancellable import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.SelectInstance import kotlinx.coroutines.selects.select +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import java.time.Clock @@ -69,15 +70,16 @@ public class SimHypervisor( * * @param model The machine to create. */ - public fun createMachine(model: SimMachineModel): SimMachine { - val vmCtx = VmExecutionContext(model) + public fun createMachine(model: SimMachineModel, performanceInterferenceModel: PerformanceInterferenceModel? = null): SimMachine { + val vm = VmSession(model, performanceInterferenceModel) + val vmCtx = VmExecutionContext(vm) return object : SimMachine { override val model: SimMachineModel get() = vmCtx.machine override val usage: StateFlow - get() = vmCtx.session.usage + get() = vm.usage /** * The current active workload. @@ -187,6 +189,7 @@ public class SimHypervisor( val totalAllocatedUsage = maxUsage - availableUsage var totalAllocatedBurst = 0L availableUsage = totalAllocatedUsage + val serverLoad = totalAllocatedUsage / maxUsage // Divide the requests over the available capacity of the pCPUs fairly for (i in pCPUs) { @@ -232,7 +235,7 @@ public class SimHypervisor( val vm = vmIterator.next() // Apply performance interference model - val performanceScore = 1.0 // TODO Performance interference + val performanceScore = vm.performanceInterferenceModel?.apply(serverLoad) ?: 1.0 var hasFinished = false for (vcpu in vm.vcpus) { @@ -330,6 +333,7 @@ public class SimHypervisor( @OptIn(InternalCoroutinesApi::class) private data class VmSession( val model: SimMachineModel, + val performanceInterferenceModel: PerformanceInterferenceModel? = null, var triggerMode: SimExecutionContext.TriggerMode = SimExecutionContext.TriggerMode.FIRST, var merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice = { _, r -> r }, var select: () -> Unit = {} @@ -491,12 +495,10 @@ public class SimHypervisor( * The execution context in which a VM runs. * */ - private inner class VmExecutionContext(override val machine: SimMachineModel) : - SimExecutionContext, - DisposableHandle { - private var finalized: Boolean = false - private var initialized: Boolean = false - val session: VmSession = VmSession(machine) + private inner class VmExecutionContext(val session: VmSession) : + SimExecutionContext, DisposableHandle { + override val machine: SimMachineModel + get() = session.model override val clock: Clock get() = this@SimHypervisor.clock diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt new file mode 100644 index 00000000..4c409887 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.interference + +import java.util.* +import kotlin.random.Random + +/** + * Meta-data key for the [PerformanceInterferenceModel] of an image. + */ +public const val IMAGE_PERF_INTERFERENCE_MODEL: String = "image:performance-interference" + +/** + * Performance Interference Model describing the variability incurred by different sets of workloads if colocated. + * + * @param items The [PerformanceInterferenceModel.Item]s that make up this model. + */ +public class PerformanceInterferenceModel( + public val items: SortedSet, + private val random: Random = Random(0) +) { + private var intersectingItems: List = emptyList() + private val colocatedWorkloads = TreeMap() + + /** + * Indicate that a VM has started. + */ + public fun onStart(name: String) { + colocatedWorkloads.merge(name, 1, Int::plus) + intersectingItems = items.filter { item -> doesMatch(item) } + } + + /** + * Indicate that a VM has stopped. + */ + public fun onStop(name: String) { + colocatedWorkloads.computeIfPresent(name) { _, v -> (v - 1).takeUnless { it == 0 } } + intersectingItems = items.filter { item -> doesMatch(item) } + } + + /** + * Compute the performance interference based on the current server load. + */ + public fun apply(currentServerLoad: Double): Double { + if (intersectingItems.isEmpty()) { + return 1.0 + } + val score = intersectingItems + .firstOrNull { it.minServerLoad <= currentServerLoad } + + // Apply performance penalty to (on average) only one of the VMs + return if (score != null && random.nextInt(score.workloadNames.size) == 0) { + score.performanceScore + } else { + 1.0 + } + } + + private fun doesMatch(item: Item): Boolean { + var count = 0 + for ( + name in item.workloadNames.subSet( + colocatedWorkloads.firstKey(), + colocatedWorkloads.lastKey() + "\u0000" + ) + ) { + count += colocatedWorkloads.getOrDefault(name, 0) + if (count > 1) + return true + } + return false + } + + /** + * Model describing how a specific set of workloads causes performance variability for each workload. + * + * @param workloadNames The names of the workloads that together cause performance variability for each workload in the set. + * @param minServerLoad The minimum total server load at which this interference is activated and noticeable. + * @param performanceScore The performance score that should be applied to each workload's performance. 1 means no + * influence, <1 means that performance degrades, and >1 means that performance improves. + */ + public data class Item( + public val workloadNames: SortedSet, + public val minServerLoad: Double, + public val performanceScore: Double + ) : Comparable { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as Item + + if (workloadNames != other.workloadNames) return false + + return true + } + + override fun hashCode(): Int = workloadNames.hashCode() + + override fun compareTo(other: Item): Int { + var cmp = performanceScore.compareTo(other.performanceScore) + if (cmp != 0) { + return cmp + } + + cmp = minServerLoad.compareTo(other.minServerLoad) + if (cmp != 0) { + return cmp + } + + return hashCode().compareTo(other.hashCode()) + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index b9cd1b06..78bd2940 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -36,7 +36,6 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter import java.time.Clock -import java.util.* /** * Test suite for the [SimHypervisor] class. -- cgit v1.2.3 From e06c953d1c71c704b351c81611e2e2bececf8e67 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 5 Oct 2020 12:12:32 +0200 Subject: Move failure models to separate module --- .../opendc-simulator-failures/build.gradle.kts | 31 +++++ .../simulator/failures/CorrelatedFaultInjector.kt | 129 +++++++++++++++++++++ .../org/opendc/simulator/failures/FailureDomain.kt | 47 ++++++++ .../org/opendc/simulator/failures/FaultInjector.kt | 33 ++++++ .../failures/UncorrelatedFaultInjector.kt | 61 ++++++++++ 5 files changed, 301 insertions(+) create mode 100644 simulator/opendc-simulator/opendc-simulator-failures/build.gradle.kts create mode 100644 simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/CorrelatedFaultInjector.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/FailureDomain.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/FaultInjector.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/UncorrelatedFaultInjector.kt (limited to 'simulator/opendc-simulator') diff --git a/simulator/opendc-simulator/opendc-simulator-failures/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-failures/build.gradle.kts new file mode 100644 index 00000000..1c30506f --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-failures/build.gradle.kts @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Failure models for OpenDC" + +plugins { + `kotlin-library-convention` +} + +dependencies { + api("org.jetbrains.kotlinx:kotlinx-coroutines-core:${Library.KOTLINX_COROUTINES}") +} diff --git a/simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/CorrelatedFaultInjector.kt b/simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/CorrelatedFaultInjector.kt new file mode 100644 index 00000000..0e15f338 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/CorrelatedFaultInjector.kt @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.failures + +import kotlinx.coroutines.* +import java.time.Clock +import kotlin.math.exp +import kotlin.math.max +import kotlin.random.Random +import kotlin.random.asJavaRandom + +/** + * A [FaultInjector] that injects fault in the system which are correlated to each other. Failures do not occur in + * isolation, but will trigger other faults. + */ +public class CorrelatedFaultInjector( + private val coroutineScope: CoroutineScope, + private val clock: Clock, + private val iatScale: Double, + private val iatShape: Double, + private val sizeScale: Double, + private val sizeShape: Double, + private val dScale: Double, + private val dShape: Double, + random: Random = Random(0) +) : FaultInjector { + /** + * The active failure domains that have been registered. + */ + private val active = mutableSetOf() + + /** + * The [Job] that awaits the nearest fault in the system. + */ + private var job: Job? = null + + /** + * The [Random] instance to use. + */ + private val random: java.util.Random = random.asJavaRandom() + + /** + * Enqueue the specified [FailureDomain] to fail some time in the future. + */ + override fun enqueue(domain: FailureDomain) { + active += domain + + // Clean up the domain if it finishes + domain.scope.coroutineContext[Job]!!.invokeOnCompletion { + this@CorrelatedFaultInjector.coroutineScope.launch { + active -= domain + + if (active.isEmpty()) { + job?.cancel() + job = null + } + } + } + + if (job != null) { + return + } + + job = this.coroutineScope.launch { + while (active.isNotEmpty()) { + ensureActive() + + // Make sure to convert delay from hours to milliseconds + val d = lognvariate(iatScale, iatShape) * 3.6e6 + + // Handle long overflow + if (clock.millis() + d <= 0) { + return@launch + } + + delay(d.toLong()) + + val n = lognvariate(sizeScale, sizeShape).toInt() + val targets = active.shuffled(random).take(n) + + for (failureDomain in targets) { + active -= failureDomain + failureDomain.fail() + } + + val df = max(lognvariate(dScale, dShape) * 6e4, 15 * 6e4) + + // Handle long overflow + if (clock.millis() + df <= 0) { + return@launch + } + + delay(df.toLong()) + + for (failureDomain in targets) { + failureDomain.recover() + + // Re-enqueue machine to be failed + enqueue(failureDomain) + } + } + + job = null + } + } + + // XXX We should extract this in some common package later on. + private fun lognvariate(scale: Double, shape: Double) = exp(scale + shape * random.nextGaussian()) +} diff --git a/simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/FailureDomain.kt b/simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/FailureDomain.kt new file mode 100644 index 00000000..dc3006e8 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/FailureDomain.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.failures + +import kotlinx.coroutines.CoroutineScope + +/** + * A logical or physical component in a computing environment which may fail. + */ +public interface FailureDomain { + /** + * The lifecycle of the failure domain to which a [FaultInjector] will attach. + */ + public val scope: CoroutineScope + + /** + * Fail the domain externally. + */ + public suspend fun fail() + + /** + * Resume the failure domain. + */ + public suspend fun recover() +} diff --git a/simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/FaultInjector.kt b/simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/FaultInjector.kt new file mode 100644 index 00000000..a866260c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/FaultInjector.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.failures + +/** + * An interface for stochastically injecting faults into a running system. + */ +public interface FaultInjector { + /** + * Enqueue the specified [FailureDomain] into the queue as candidate for failure injection in the future. + */ + public fun enqueue(domain: FailureDomain) +} diff --git a/simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/UncorrelatedFaultInjector.kt b/simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/UncorrelatedFaultInjector.kt new file mode 100644 index 00000000..b3bd737e --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/UncorrelatedFaultInjector.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.failures + +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import java.time.Clock +import kotlin.math.ln1p +import kotlin.math.pow +import kotlin.random.Random + +/** + * A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are + * independent. + */ +public class UncorrelatedFaultInjector( + private val clock: Clock, + private val alpha: Double, + private val beta: Double, + private val random: Random = Random(0) +) : FaultInjector { + /** + * Enqueue the specified [FailureDomain] to fail some time in the future. + */ + override fun enqueue(domain: FailureDomain) { + domain.scope.launch { + val d = random.weibull(alpha, beta) * 1e3 // Make sure to convert delay to milliseconds + + // Handle long overflow + if (clock.millis() + d <= 0) { + return@launch + } + + delay(d.toLong()) + domain.fail() + } + } + + // XXX We should extract this in some common package later on. + private fun Random.weibull(alpha: Double, beta: Double) = (beta * (-ln1p(-nextDouble())).pow(1.0 / alpha)) +} -- cgit v1.2.3