diff options
Diffstat (limited to 'simulator/opendc-simulator')
16 files changed, 1223 insertions, 879 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts index cd7e5706..844a7c6d 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts +++ b/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts @@ -28,6 +28,7 @@ plugins { dependencies { api(project(":opendc-simulator:opendc-simulator-core")) + implementation(project(":opendc-utils")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 5e50a676..812b5f20 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -25,13 +25,13 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.intrinsics.startCoroutineCancellable -import kotlinx.coroutines.selects.SelectClause0 -import kotlinx.coroutines.selects.SelectInstance +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimResourceCommand import org.opendc.simulator.compute.workload.SimWorkload -import java.lang.Runnable +import org.opendc.utils.TimerScheduler import java.time.Clock -import kotlin.coroutines.ContinuationInterceptor +import java.util.* +import kotlin.coroutines.* import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -59,23 +59,29 @@ public class SimBareMetalMachine( get() = usageState /** + * A flag to indicate that the machine is terminated. + */ + private var isTerminated = false + + /** + * The [MutableStateFlow] containing the load of the server. + */ + private val usageState = MutableStateFlow(0.0) + + /** * The current active workload. */ - private var activeWorkload: SimWorkload? = null + private var cont: Continuation<Unit>? = null /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * The active CPUs of this machine. */ - override suspend fun run(workload: SimWorkload) { - require(activeWorkload == null) { "Run should not be called concurrently" } + private var cpus: List<Cpu> = emptyList() - try { - activeWorkload = workload - workload.run(ctx) - } finally { - activeWorkload = null - } - } + /** + * The [TimerScheduler] to use for scheduling the interrupts. + */ + private val scheduler = TimerScheduler<Cpu>(coroutineScope, clock) /** * The execution context in which the workload runs. @@ -87,199 +93,220 @@ public class SimBareMetalMachine( override val clock: Clock get() = this@SimBareMetalMachine.clock - override fun onRun( - batch: Sequence<SimExecutionContext.Slice>, - triggerMode: SimExecutionContext.TriggerMode, - merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice - ): SelectClause0 { - return object : SelectClause0 { - @InternalCoroutinesApi - override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { - val context = select.completion.context - - // Do not reset the usage state: we will set it ourselves - usageFlush?.dispose() - usageFlush = null - - val queue = batch.iterator() - var start = Long.MIN_VALUE - var currentWork: SliceWork? = null - var currentDisposable: DisposableHandle? = null - - fun schedule(slice: SimExecutionContext.Slice) { - start = clock.millis() - - val isLastSlice = !queue.hasNext() - val work = SliceWork(slice) - val candidateDuration = when (triggerMode) { - SimExecutionContext.TriggerMode.FIRST -> work.minExit - SimExecutionContext.TriggerMode.LAST -> work.maxExit - SimExecutionContext.TriggerMode.DEADLINE -> slice.deadline - start - } - - // Check whether the deadline is exceeded during the run of the slice. - val duration = min(candidateDuration, slice.deadline - start) - - val action = Runnable { - currentWork = null - - // Flush all the work that was performed - val hasFinished = work.stop(duration) - - if (!isLastSlice) { - val candidateSlice = queue.next() - val nextSlice = - // If our previous slice exceeds its deadline, merge it with the next candidate slice - if (hasFinished) - candidateSlice - else - merge(candidateSlice, slice) - schedule(nextSlice) - } else if (select.trySelect()) { - block.startCoroutineCancellable(select.completion) - } - } - - // Schedule the flush after the entire slice has finished - currentDisposable = delay.invokeOnTimeout(duration, action, context) - - // Start the slice work - currentWork = work - work.start() - } - - // Schedule the first work - if (queue.hasNext()) { - schedule(queue.next()) - - // A DisposableHandle to flush the work in case the call is cancelled - val disposable = DisposableHandle { - val end = clock.millis() - val duration = end - start + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } - currentWork?.stop(duration) - currentDisposable?.dispose() + /** + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + */ + override suspend fun run(workload: SimWorkload) { + require(!isTerminated) { "Machine is terminated" } + require(cont == null) { "Run should not be called concurrently" } - val action = { - usageState.value = 0.0 - usageFlush = null - } + workload.onStart(ctx) - // Schedule reset the usage of the machine since the call is returning - usageFlush = delay.invokeOnTimeout(1, action, context) - } + return suspendCancellableCoroutine { cont -> + this.cont = cont + this.cpus = model.cpus.map { Cpu(it, workload) } - select.disposeOnSelect(disposable) - } else if (select.trySelect()) { - // No work has been given: select immediately - block.startCoroutineCancellable(select.completion) - } - } + for (cpu in cpus) { + cpu.start() } } } /** - * The [MutableStateFlow] containing the load of the server. + * Terminate the specified bare-metal machine. */ - private val usageState = MutableStateFlow(0.0) + override fun close() { + isTerminated = true + } /** - * A disposable to prevent resetting the usage state for subsequent calls to onRun. + * Update the usage of the machine. */ - private var usageFlush: DisposableHandle? = null + private fun updateUsage() { + usageState.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency } + } /** - * Cache the [Delay] instance for timing. - * - * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy. - * XXX Note however that this is an ugly hack which may break in the future. + * This method is invoked when one of the CPUs has exited. */ - @OptIn(InternalCoroutinesApi::class) - private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay + private fun onCpuExit(cpu: Int) { + // Check whether all other CPUs have finished + if (cpus.all { it.hasExited }) { + val cont = cont + this.cont = null + cont?.resume(Unit) + } + } + + /** + * This method is invoked when one of the CPUs failed. + */ + private fun onCpuFailure(e: Throwable) { + // Make sure no other tasks will be resumed. + scheduler.cancelAll() + + // In case the flush fails with an exception, immediately propagate to caller, cancelling all other + // tasks. + val cont = cont + this.cont = null + cont?.resumeWithException(e) + } /** - * A slice to be processed. + * A physical CPU of the machine. */ - private inner class SliceWork(val slice: SimExecutionContext.Slice) { + private inner class Cpu(val model: ProcessingUnit, val workload: SimWorkload) { /** - * The duration after which the first processor finishes processing this slice. + * The current command. */ - val minExit: Long + private var currentCommand: CommandWrapper? = null /** - * The duration after which the last processor finishes processing this slice. + * The actual processing speed. */ - val maxExit: Long + var speed: Double = 0.0 + set(value) { + field = value + updateUsage() + } /** - * A flag to indicate that the slice will exceed the deadline. + * A flag to indicate that the CPU is currently processing a command. */ - val exceedsDeadline: Boolean - get() = slice.deadline < maxExit + var isIntermediate: Boolean = false /** - * The total amount of CPU usage. + * A flag to indicate that the CPU has exited. */ - val totalUsage: Double + var hasExited: Boolean = false /** - * A flag to indicate that this slice is empty. + * Process the specified [SimResourceCommand] for this CPU. */ - val isEmpty: Boolean - - init { - var totalUsage = 0.0 - var minExit = Long.MAX_VALUE - var maxExit = 0L - var nonEmpty = false - - // Determine the duration of the first/last CPU to finish - for (i in 0 until min(model.cpus.size, slice.burst.size)) { - val cpu = model.cpus[i] - val usage = min(slice.limit[i], cpu.frequency) - val cpuDuration = ceil(slice.burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds - - totalUsage += usage / cpu.frequency - - if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst - minExit = min(minExit, cpuDuration) - maxExit = max(maxExit, cpuDuration) - nonEmpty = true + fun process(command: SimResourceCommand) { + val timestamp = clock.millis() + + val task = when (command) { + is SimResourceCommand.Idle -> { + speed = 0.0 + + val deadline = command.deadline + + require(deadline >= timestamp) { "Deadline already passed" } + + if (deadline != Long.MAX_VALUE) { + scheduler.startSingleTimerTo(this, deadline) { flush() } + } else { + null + } + } + is SimResourceCommand.Consume -> { + val work = command.work + val limit = command.limit + val deadline = command.deadline + + require(deadline >= timestamp) { "Deadline already passed" } + + speed = min(model.frequency, limit) + + // The required duration to process all the work + val finishedAt = timestamp + ceil(work / speed * 1000).toLong() + + scheduler.startSingleTimerTo(this, min(finishedAt, deadline)) { flush() } + } + is SimResourceCommand.Exit -> { + speed = 0.0 + hasExited = true + + onCpuExit(model.id) + + null } } - this.isEmpty = !nonEmpty - this.totalUsage = totalUsage - this.minExit = if (isEmpty) 0 else minExit - this.maxExit = maxExit + assert(currentCommand == null) { "Concurrent access to current command" } + currentCommand = CommandWrapper(timestamp, command) } /** - * Indicate that the work on the slice has started. + * Request the workload for more work. */ - fun start() { - usageState.value = totalUsage / model.cpus.size + private fun next(remainingWork: Double) { + process(workload.onNext(ctx, model.id, remainingWork)) } /** - * Flush the work performed on the slice. + * Start the CPU. */ - fun stop(duration: Long): Boolean { - var hasFinished = true + fun start() { + try { + isIntermediate = true + + process(workload.onStart(ctx, model.id)) + } catch (e: Throwable) { + onCpuFailure(e) + } finally { + isIntermediate = false + } + } - for (i in 0 until min(model.cpus.size, slice.burst.size)) { - val usage = min(slice.limit[i], model.cpus[i].frequency) - val granted = ceil(duration / 1000.0 * usage).toLong() - val res = max(0, slice.burst[i] - granted) - slice.burst[i] = res + /** + * Flush the work performed by the CPU. + */ + fun flush() { + try { + val (timestamp, command) = currentCommand ?: return + + isIntermediate = true + currentCommand = null + + // Cancel the running task and flush the progress + scheduler.cancel(this) + + when (command) { + is SimResourceCommand.Idle -> next(remainingWork = 0.0) + is SimResourceCommand.Consume -> { + val duration = clock.millis() - timestamp + val remainingWork = if (duration > 0L) { + val processed = duration / 1000.0 * speed + max(0.0, command.work - processed) + } else { + 0.0 + } - if (res != 0L) { - hasFinished = false + next(remainingWork) + } + SimResourceCommand.Exit -> throw IllegalStateException() } + } catch (e: Throwable) { + onCpuFailure(e) + } finally { + isIntermediate = false + } + } + + /** + * Interrupt the CPU. + */ + fun interrupt() { + // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead + // to infinite recursion. + if (isIntermediate) { + return } - return hasFinished + flush() } } + + /** + * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. + */ + private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt index 5801fcd5..c7c3d3cc 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.selects.SelectClause0 -import kotlinx.coroutines.selects.select import java.time.Clock /** @@ -43,113 +41,10 @@ public interface SimExecutionContext { public val machine: SimMachineModel /** - * Ask the processor cores to run the specified [slice] and suspend execution until the trigger condition is met as - * specified by [triggerMode]. + * Ask the host machine to interrupt the specified vCPU. * - * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which - * may be zero). These changes may happen anytime during execution of this method and callers should not rely on - * the timing of this change. - * - * @param slice The representation of work to run on the processors. - * @param triggerMode The trigger condition to resume execution. - */ - public suspend fun run(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST): Unit = - select { onRun(slice, triggerMode).invoke {} } - - /** - * Ask the processors cores to run the specified [batch] of work slices and suspend execution until the trigger - * condition is met as specified by [triggerMode]. - * - * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which - * may be zero). These changes may happen anytime during execution of this method and callers should not rely on - * the timing of this change. - * - * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these - * slices with the next slice to be executed. - * - * @param batch The batch of work to run on the processors. - * @param triggerMode The trigger condition to resume execution. - * @param merge The merge function for consecutive slices in case the last slice was not completed within its - * deadline. + * @param cpu The id of the vCPU to interrupt. + * @throws IllegalArgumentException if the identifier points to a non-existing vCPU. */ - public suspend fun run( - batch: Sequence<Slice>, - triggerMode: TriggerMode = TriggerMode.FIRST, - merge: (Slice, Slice) -> Slice = { _, r -> r } - ): Unit = select { onRun(batch, triggerMode, merge).invoke {} } - - /** - * Ask the processor cores to run the specified [slice] and select when the trigger condition is met as specified - * by [triggerMode]. - * - * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which - * may be zero). These changes may happen anytime during execution of this method and callers should not rely on - * the timing of this change. - * - * @param slice The representation of work to request from the processors. - * @param triggerMode The trigger condition to resume execution. - */ - public fun onRun(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST): SelectClause0 = - onRun(sequenceOf(slice), triggerMode) - - /** - * Ask the processors cores to run the specified [batch] of work slices and select when the trigger condition is met - * as specified by [triggerMode]. - * - * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which - * may be zero). These changes may happen anytime during execution of this method and callers should not rely on - * the timing of this change. - * - * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these - * slices with the next slice to be executed. - * - * @param batch The batch of work to run on the processors. - * @param triggerMode The trigger condition to resume execution during the **last** slice. - * @param merge The merge function for consecutive slices in case the last slice was not completed within its - * deadline. - */ - public fun onRun( - batch: Sequence<Slice>, - triggerMode: TriggerMode = TriggerMode.FIRST, - merge: (Slice, Slice) -> Slice = { _, r -> r } - ): SelectClause0 - - /** - * A request to the host machine for a slice of CPU time from the processor cores. - * - * Both [burst] and [limit] must be of the same size and in any other case the method will throw an - * [IllegalArgumentException]. - * - * - * @param burst The burst time to request from each of the processor cores. - * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst. - * @param deadline The instant at which this slice needs to be fulfilled. - */ - public class Slice(public val burst: LongArray, public val limit: DoubleArray, public val deadline: Long) { - init { - require(burst.size == limit.size) { "Incompatible array dimensions" } - } - } - - /** - * The modes for triggering a machine exit from the machine. - */ - public enum class TriggerMode { - /** - * A machine exit occurs when either the first processor finishes processing a **non-zero** burst or the - * deadline is reached. - */ - FIRST, - - /** - * A machine exit occurs when either the last processor finishes processing a **non-zero** burst or the deadline - * is reached. - */ - LAST, - - /** - * A machine exit occurs only when the deadline is reached. - */ - DEADLINE - } + public fun interrupt(cpu: Int) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt new file mode 100644 index 00000000..5e86d32b --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -0,0 +1,590 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimResourceCommand +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.compute.workload.SimWorkloadBarrier +import java.time.Clock +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single + * [SimBareMetalMachine] concurrently using weighted fair sharing. + * + * @param listener The hypervisor listener to use. + */ +public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor { + + override fun onStart(ctx: SimExecutionContext) { + val model = ctx.machine + this.ctx = ctx + this.commands = Array(model.cpus.size) { SimResourceCommand.Idle() } + this.pCpus = model.cpus.indices.sortedBy { model.cpus[it].frequency }.toIntArray() + this.maxUsage = model.cpus.sumByDouble { it.frequency } + this.barrier = SimWorkloadBarrier(model.cpus.size) + } + + override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { + return commands[cpu] + } + + override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { + totalRemainingWork += remainingWork + val isLast = barrier.enter() + + // Flush the progress of the guest after the barrier has been reached. + if (isLast && isDirty) { + isDirty = false + flushGuests() + } + + return if (isDirty) { + // Wait for the scheduler determine the work after the barrier has been reached by all CPUs. + SimResourceCommand.Idle() + } else { + // Indicate that the scheduler needs to run next call. + if (isLast) { + isDirty = true + } + + commands[cpu] + } + } + + override fun canFit(model: SimMachineModel): Boolean = true + + override fun createMachine( + model: SimMachineModel, + performanceInterferenceModel: PerformanceInterferenceModel? + ): SimMachine = SimVm(model, performanceInterferenceModel) + + /** + * The execution context in which the hypervisor runs. + */ + private lateinit var ctx: SimExecutionContext + + /** + * The commands to submit to the underlying host. + */ + private lateinit var commands: Array<SimResourceCommand> + + /** + * The active vCPUs. + */ + private val vcpus: MutableList<VCpu> = mutableListOf() + + /** + * The indices of the physical CPU ordered by their speed. + */ + private lateinit var pCpus: IntArray + + /** + * The maximum amount of work to be performed per second. + */ + private var maxUsage: Double = 0.0 + + /** + * The current load on the hypervisor. + */ + private var load: Double = 0.0 + + /** + * The total amount of remaining work (of all pCPUs). + */ + private var totalRemainingWork: Double = 0.0 + + /** + * The total speed requested by the vCPUs. + */ + private var totalRequestedSpeed = 0.0 + + /** + * The total amount of work requested by the vCPUs. + */ + private var totalRequestedWork = 0.0 + + /** + * The total allocated speed for the vCPUs. + */ + private var totalAllocatedSpeed = 0.0 + + /** + * The total allocated work requested for the vCPUs. + */ + private var totalAllocatedWork = 0.0 + + /** + * The amount of work that could not be performed due to over-committing resources. + */ + private var totalOvercommittedWork = 0.0 + + /** + * The amount of work that was lost due to interference. + */ + private var totalInterferedWork = 0.0 + + /** + * A flag to indicate that the scheduler has submitted work that has not yet been completed. + */ + private var isDirty: Boolean = false + + /** + * The scheduler barrier. + */ + private lateinit var barrier: SimWorkloadBarrier + + /** + * Indicate that the workloads should be re-scheduled. + */ + private fun shouldSchedule() { + isDirty = true + ctx.interruptAll() + } + + /** + * Schedule the work over the physical CPUs. + */ + private fun doSchedule() { + // If there is no work yet, mark all pCPUs as idle. + if (vcpus.isEmpty()) { + commands.fill(SimResourceCommand.Idle()) + ctx.interruptAll() + } + + var duration: Double = Double.MAX_VALUE + var deadline: Long = Long.MAX_VALUE + var availableSpeed = maxUsage + var totalRequestedSpeed = 0.0 + var totalRequestedWork = 0.0 + + // Sort the vCPUs based on their requested usage + // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + vcpus.sort() + + // Divide the available host capacity fairly across the vCPUs using max-min fair sharing + val vcpuIterator = vcpus.listIterator() + var remaining = vcpus.size + while (vcpuIterator.hasNext()) { + val vcpu = vcpuIterator.next() + val availableShare = availableSpeed / remaining-- + + when (val command = vcpu.command) { + is SimResourceCommand.Idle -> { + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + vcpu.actualSpeed = 0.0 + } + is SimResourceCommand.Consume -> { + val grantedSpeed = min(vcpu.allowedSpeed, availableShare) + + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + // Ignore idle computation + if (grantedSpeed <= 0.0 || command.work <= 0.0) { + vcpu.actualSpeed = 0.0 + continue + } + + totalRequestedSpeed += command.limit + totalRequestedWork += command.work + + vcpu.actualSpeed = grantedSpeed + availableSpeed -= grantedSpeed + + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, command.work / grantedSpeed) + } + SimResourceCommand.Exit -> { + // Apparently the vCPU has exited, so remove it from the scheduling queue. + vcpuIterator.remove() + } + } + } + + // Round the duration to milliseconds + duration = ceil(duration * 1000) / 1000 + + assert(deadline >= ctx.clock.millis()) { "Deadline already passed" } + + val totalAllocatedSpeed = maxUsage - availableSpeed + var totalAllocatedWork = 0.0 + availableSpeed = totalAllocatedSpeed + load = totalAllocatedSpeed / maxUsage + + // Divide the requests over the available capacity of the pCPUs fairly + for (i in pCpus) { + val maxCpuUsage = ctx.machine.cpus[i].frequency + val fraction = maxCpuUsage / maxUsage + val grantedSpeed = min(maxCpuUsage, totalAllocatedSpeed * fraction) + val grantedWork = duration * grantedSpeed + + commands[i] = + if (grantedWork > 0.0 && grantedSpeed > 0.0) + SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + + totalAllocatedWork += grantedWork + availableSpeed -= grantedSpeed + } + + this.totalRequestedSpeed = totalRequestedSpeed + this.totalRequestedWork = totalRequestedWork + this.totalAllocatedSpeed = totalAllocatedSpeed + this.totalAllocatedWork = totalAllocatedWork + + ctx.interruptAll() + } + + /** + * Flush the progress of the vCPUs. + */ + private fun flushGuests() { + // Flush all the vCPUs work + for (vcpu in vcpus) { + vcpu.flush(interrupt = false) + } + + // Report metrics + listener?.onSliceFinish( + this, + totalRequestedWork.toLong(), + (totalAllocatedWork - totalRemainingWork).toLong(), + totalOvercommittedWork.toLong(), + totalInterferedWork.toLong(), + totalRequestedSpeed, + totalAllocatedSpeed + ) + totalRemainingWork = 0.0 + totalInterferedWork = 0.0 + totalOvercommittedWork = 0.0 + + // Force all pCPUs to re-schedule their work. + doSchedule() + } + + /** + * Interrupt all host CPUs. + */ + private fun SimExecutionContext.interruptAll() { + for (i in machine.cpus.indices) { + interrupt(i) + } + } + + /** + * A virtual machine running on the hypervisor. + * + * @property model The machine model of the virtual machine. + * @property performanceInterferenceModel The performance interference model to utilize. + */ + private inner class SimVm( + override val model: SimMachineModel, + val performanceInterferenceModel: PerformanceInterferenceModel? = null, + ) : SimMachine { + /** + * A [StateFlow] representing the CPU usage of the simulated machine. + */ + override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) + + /** + * A flag to indicate that the machine is terminated. + */ + private var isTerminated = false + + /** + * The current active workload. + */ + private var cont: Continuation<Unit>? = null + + /** + * The active CPUs of this virtual machine. + */ + private var cpus: List<VCpu> = emptyList() + + /** + * The execution context in which the workload runs. + */ + val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = model + + override val clock: Clock + get() = this@SimFairShareHypervisor.ctx.clock + + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } + + /** + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + */ + override suspend fun run(workload: SimWorkload) { + require(!isTerminated) { "Machine is terminated" } + require(cont == null) { "Run should not be called concurrently" } + + workload.onStart(ctx) + + return suspendCancellableCoroutine { cont -> + this.cont = cont + this.cpus = model.cpus.map { VCpu(this, it, workload) } + + for (cpu in cpus) { + // Register vCPU to scheduler + vcpus.add(cpu) + + cpu.start() + } + + // Re-schedule the work over the pCPUs + shouldSchedule() + } + } + + /** + * Terminate this VM instance. + */ + override fun close() { + isTerminated = true + } + + /** + * Update the usage of the VM. + */ + fun updateUsage() { + usage.value = cpus.sumByDouble { it.actualSpeed } / cpus.sumByDouble { it.model.frequency } + } + + /** + * This method is invoked when one of the CPUs has exited. + */ + fun onCpuExit(cpu: Int) { + // Check whether all other CPUs have finished + if (cpus.all { it.hasExited }) { + val cont = cont + this.cont = null + cont?.resume(Unit) + } + } + + /** + * This method is invoked when one of the CPUs failed. + */ + fun onCpuFailure(e: Throwable) { + // In case the flush fails with an exception, immediately propagate to caller, cancelling all other + // tasks. + val cont = cont + this.cont = null + cont?.resumeWithException(e) + } + } + + /** + * A CPU of the virtual machine. + */ + private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload) : Comparable<VCpu> { + /** + * The latest command processed by the CPU. + */ + var command: SimResourceCommand = SimResourceCommand.Idle() + + /** + * The latest timestamp at which the vCPU was flushed. + */ + var latestFlush: Long = 0 + + /** + * The processing speed that is allowed by the model constraints. + */ + var allowedSpeed: Double = 0.0 + + /** + * The actual processing speed. + */ + var actualSpeed: Double = 0.0 + set(value) { + field = value + vm.updateUsage() + } + + /** + * A flag to indicate that the CPU is currently processing a command. + */ + var isIntermediate: Boolean = false + + /** + * A flag to indicate that the CPU has exited. + */ + val hasExited: Boolean + get() = command is SimResourceCommand.Exit + + /** + * Process the specified [SimResourceCommand] for this CPU. + */ + fun process(command: SimResourceCommand) { + // Assign command as the most recent executed command + this.command = command + + when (command) { + is SimResourceCommand.Idle -> { + require(command.deadline >= ctx.clock.millis()) { "Deadline already passed" } + + allowedSpeed = 0.0 + } + is SimResourceCommand.Consume -> { + require(command.deadline >= ctx.clock.millis()) { "Deadline already passed" } + + allowedSpeed = min(model.frequency, command.limit) + } + is SimResourceCommand.Exit -> { + allowedSpeed = 0.0 + actualSpeed = 0.0 + + vm.onCpuExit(model.id) + } + } + } + + /** + * Start the CPU. + */ + fun start() { + try { + isIntermediate = true + latestFlush = ctx.clock.millis() + + process(workload.onStart(vm.ctx, model.id)) + } catch (e: Throwable) { + fail(e) + } finally { + isIntermediate = false + } + } + + /** + * Flush the work performed by the CPU. + */ + fun flush(interrupt: Boolean) { + val now = ctx.clock.millis() + + // Fast path: if the CPU was already flushed at at the current instant, no need to flush the progress. + if (latestFlush >= now) { + return + } + + try { + isIntermediate = true + when (val command = command) { + is SimResourceCommand.Idle -> { + // Act like nothing has happened in case the vCPU did not reach its deadline or was not + // interrupted by the user. + if (interrupt || command.deadline <= now) { + process(workload.onNext(vm.ctx, model.id, 0.0)) + } + } + is SimResourceCommand.Consume -> { + // Apply performance interference model + val performanceScore = vm.performanceInterferenceModel?.apply(load) ?: 1.0 + + // Compute the remaining amount of work + val remainingWork = if (command.work > 0.0) { + // Compute the fraction of compute time allocated to the VM + val fraction = actualSpeed / totalAllocatedSpeed + + // Compute the work that was actually granted to the VM. + val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction + val processed = processingAvailable * performanceScore + + val interferedWork = processingAvailable - processed + totalInterferedWork += interferedWork + + max(0.0, command.work - processed) + } else { + 0.0 + } + + // Act like nothing has happened in case the vCPU did not finish yet or was not interrupted by + // the user. + if (interrupt || remainingWork == 0.0 || command.deadline <= now) { + if (!interrupt) { + totalOvercommittedWork += remainingWork + } + + process(workload.onNext(vm.ctx, model.id, remainingWork)) + } else { + process(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline)) + } + } + SimResourceCommand.Exit -> + throw IllegalStateException() + } + } catch (e: Throwable) { + fail(e) + } finally { + latestFlush = now + isIntermediate = false + } + } + + /** + * Interrupt the CPU. + */ + fun interrupt() { + // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead + // to infinite recursion. + if (isIntermediate) { + return + } + + flush(interrupt = true) + + // Force the scheduler to re-schedule + shouldSchedule() + } + + /** + * Fail the CPU. + */ + fun fail(e: Throwable) { + command = SimResourceCommand.Exit + vm.onCpuFailure(e) + } + + override fun compareTo(other: VCpu): Int = allowedSpeed.compareTo(other.allowedSpeed) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt deleted file mode 100644 index b88871a5..00000000 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt +++ /dev/null @@ -1,517 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.intrinsics.startCoroutineCancellable -import kotlinx.coroutines.selects.SelectClause0 -import kotlinx.coroutines.selects.SelectInstance -import kotlinx.coroutines.selects.select -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimWorkload -import java.time.Clock -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min - -/** - * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single - * [SimBareMetalMachine] concurrently using weighted fair sharing. - * - * @param coroutineScope The [CoroutineScope] to run the simulated workloads in. - * @param clock The virtual clock to track the simulation time. - */ -@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) -public class SimFairSharedHypervisor( - private val coroutineScope: CoroutineScope, - private val clock: Clock, - private val listener: SimHypervisor.Listener? = null -) : SimHypervisor { - /** - * A flag to indicate the driver is stopped. - */ - private var stopped: Boolean = false - - /** - * The channel for scheduling new CPU requests. - */ - private val schedulingQueue = Channel<SchedulerCommand>(Channel.UNLIMITED) - - /** - * Create a [SimMachine] instance on which users may run a [SimWorkload]. - * - * @param model The machine to create. - */ - override fun createMachine(model: SimMachineModel, performanceInterferenceModel: PerformanceInterferenceModel?): SimMachine { - val vm = VmSession(model, performanceInterferenceModel) - val vmCtx = VmExecutionContext(vm) - - return object : SimMachine { - override val model: SimMachineModel - get() = vmCtx.machine - - override val usage: StateFlow<Double> - get() = vm.usage - - /** - * The current active workload. - */ - private var activeWorkload: SimWorkload? = null - - override suspend fun run(workload: SimWorkload) { - require(activeWorkload == null) { "Run should not be called concurrently" } - - try { - activeWorkload = workload - workload.run(vmCtx) - } finally { - activeWorkload = null - } - } - - override fun toString(): String = "SimVirtualMachine" - } - } - - /** - * Run the scheduling process of the hypervisor. - */ - override suspend fun run(ctx: SimExecutionContext) { - val model = ctx.machine - val maxUsage = model.cpus.sumByDouble { it.frequency } - val pCPUs = model.cpus.indices.sortedBy { model.cpus[it].frequency } - - val vms = mutableSetOf<VmSession>() - val vcpus = mutableListOf<VCpu>() - - val usage = DoubleArray(model.cpus.size) - val burst = LongArray(model.cpus.size) - - fun process(command: SchedulerCommand) { - when (command) { - is SchedulerCommand.Schedule -> { - vms += command.vm - vcpus.addAll(command.vm.vcpus) - } - is SchedulerCommand.Deschedule -> { - vms -= command.vm - vcpus.removeAll(command.vm.vcpus) - } - is SchedulerCommand.Interrupt -> { - } - } - } - - fun processRemaining() { - var command = schedulingQueue.poll() - while (command != null) { - process(command) - command = schedulingQueue.poll() - } - } - - while (!stopped) { - // Wait for a request to be submitted if we have no work yet. - if (vcpus.isEmpty()) { - process(schedulingQueue.receive()) - } - - processRemaining() - - val start = clock.millis() - - var duration: Double = Double.POSITIVE_INFINITY - var deadline: Long = Long.MAX_VALUE - var availableUsage = maxUsage - var totalRequestedUsage = 0.0 - var totalRequestedBurst = 0L - - // Sort the vCPUs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set - vcpus.sort() - - // Divide the available host capacity fairly across the vCPUs using max-min fair sharing - for ((i, req) in vcpus.withIndex()) { - val remaining = vcpus.size - i - val availableShare = availableUsage / remaining - val grantedUsage = min(req.limit, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, req.vm.deadline) - - // Ignore empty CPUs - if (grantedUsage <= 0 || req.burst <= 0) { - req.allocatedLimit = 0.0 - continue - } - - totalRequestedUsage += req.limit - totalRequestedBurst += req.burst - - req.allocatedLimit = grantedUsage - availableUsage -= grantedUsage - - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, req.burst / grantedUsage) - } - - val totalAllocatedUsage = maxUsage - availableUsage - var totalAllocatedBurst = 0L - availableUsage = totalAllocatedUsage - val serverLoad = totalAllocatedUsage / maxUsage - - // / XXX Ceil duration to eliminate rounding issues - duration = ceil(duration) - - // Divide the requests over the available capacity of the pCPUs fairly - for (i in pCPUs) { - val maxCpuUsage = model.cpus[i].frequency - val fraction = maxCpuUsage / maxUsage - val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction) - val grantedBurst = ceil(duration * grantedUsage).toLong() - - usage[i] = grantedUsage - burst[i] = grantedBurst - totalAllocatedBurst += grantedBurst - availableUsage -= grantedUsage - } - - // XXX If none of the VMs require any computation, wait until their deadline, otherwise trigger on the - // first vCPU finished. - val triggerMode = - if (totalAllocatedBurst > 0 && totalAllocatedUsage > 0.0) - SimExecutionContext.TriggerMode.FIRST - else - SimExecutionContext.TriggerMode.DEADLINE - - // We run the total burst on the host processor. Note that this call may be cancelled at any moment in - // time, so not all of the burst may be executed. - val isInterrupted = select<Boolean> { - schedulingQueue.onReceive { schedulingQueue.offer(it); true } - ctx.onRun(SimExecutionContext.Slice(burst, usage, deadline), triggerMode) - .invoke { false } - } - - val end = clock.millis() - - // The total requested burst that the VMs wanted to run in the time-frame that we ran. - val totalRequestedSubBurst = - vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum() - val totalRemainder = burst.sum() - val totalGrantedBurst = totalAllocatedBurst - totalRemainder - - // The burst that was lost due to overcommissioning of CPU resources - var totalOvercommissionedBurst = 0L - // The burst that was lost due to interference. - var totalInterferedBurst = 0L - - val vmIterator = vms.iterator() - while (vmIterator.hasNext()) { - val vm = vmIterator.next() - - // Apply performance interference model - val performanceScore = vm.performanceInterferenceModel?.apply(serverLoad) ?: 1.0 - var hasFinished = false - - for (vcpu in vm.vcpus) { - // Compute the fraction of compute time allocated to the VM - val fraction = vcpu.allocatedLimit / totalAllocatedUsage - - // Compute the burst time that the VM was actually granted - val grantedBurst = ceil(totalGrantedBurst * fraction).toLong() - - // The burst that was actually used by the VM - val usedBurst = ceil(grantedBurst * performanceScore).toLong() - - totalInterferedBurst += grantedBurst - usedBurst - - // Compute remaining burst time to be executed for the request - if (vcpu.consume(usedBurst)) { - hasFinished = true - } else if (vm.deadline <= end) { - // Request must have its entire burst consumed or otherwise we have overcommission - // Note that we count the overcommissioned burst if the hypervisor has failed. - totalOvercommissionedBurst += vcpu.burst - } - } - - if (hasFinished || vm.deadline <= end) { - // Mark the VM as finished and deschedule the VMs if needed - if (vm.finish()) { - vmIterator.remove() - vcpus.removeAll(vm.vcpus) - } - } - } - - listener?.onSliceFinish( - this, - totalRequestedBurst, - min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing - totalOvercommissionedBurst, - totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, - min( - totalAllocatedUsage, - totalRequestedUsage - ), // The allocated usage might be slightly higher due to FP rounding - totalRequestedUsage - ) - } - } - - /** - * A scheduling command processed by the scheduler. - */ - private sealed class SchedulerCommand { - /** - * Schedule the specified VM on the hypervisor. - */ - data class Schedule(val vm: VmSession) : SchedulerCommand() - - /** - * De-schedule the specified VM on the hypervisor. - */ - data class Deschedule(val vm: VmSession) : SchedulerCommand() - - /** - * Interrupt the scheduler. - */ - object Interrupt : SchedulerCommand() - } - - /** - * A virtual machine running on the hypervisor. - * - * @param ctx The execution context the vCPU runs in. - * @param triggerMode The mode when to trigger the VM exit. - * @param merge The function to merge consecutive slices on spillover. - * @param select The function to select on finish. - */ - @OptIn(InternalCoroutinesApi::class) - private data class VmSession( - val model: SimMachineModel, - val performanceInterferenceModel: PerformanceInterferenceModel? = null, - var triggerMode: SimExecutionContext.TriggerMode = SimExecutionContext.TriggerMode.FIRST, - var merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice = { _, r -> r }, - var select: () -> Unit = {} - ) { - /** - * The vCPUs of this virtual machine. - */ - val vcpus: List<VCpu> - - /** - * The slices that the VM wants to run. - */ - var queue: Iterator<SimExecutionContext.Slice> = emptyList<SimExecutionContext.Slice>().iterator() - - /** - * The current active slice. - */ - var activeSlice: SimExecutionContext.Slice? = null - - /** - * The current deadline of the VM. - */ - val deadline: Long - get() = activeSlice?.deadline ?: Long.MAX_VALUE - - /** - * A flag to indicate that the VM is idle. - */ - val isIdle: Boolean - get() = activeSlice == null - - /** - * The usage of the virtual machine. - */ - val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) - - init { - vcpus = model.cpus.mapIndexed { i, model -> VCpu(this, model, i) } - } - - /** - * Schedule the given slices on this vCPU, replacing the existing slices. - */ - fun schedule(slices: Sequence<SimExecutionContext.Slice>) { - queue = slices.iterator() - - if (queue.hasNext()) { - activeSlice = queue.next() - refresh() - } - } - - /** - * Cancel the existing workload on the VM. - */ - fun cancel() { - queue = emptyList<SimExecutionContext.Slice>().iterator() - activeSlice = null - refresh() - } - - /** - * Finish the current slice of the VM. - * - * @return `true` if the vCPUs may be descheduled, `false` otherwise. - */ - fun finish(): Boolean { - val activeSlice = activeSlice ?: return true - - return if (queue.hasNext()) { - val needsMerge = activeSlice.burst.any { it > 0 } - val candidateSlice = queue.next() - val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice - - this.activeSlice = slice - - // Update the vCPU cache - refresh() - - false - } else { - this.activeSlice = null - select() - true - } - } - - /** - * Refresh the vCPU cache. - */ - fun refresh() { - vcpus.forEach { it.refresh() } - usage.value = vcpus.sumByDouble { it.burst / it.limit } / vcpus.size - } - } - - /** - * A virtual CPU that can be scheduled on a physical CPU. - * - * @param vm The VM of which this vCPU is part. - * @param model The model of CPU that this vCPU models. - * @param id The id of the vCPU with respect to the VM. - */ - private data class VCpu( - val vm: VmSession, - val model: ProcessingUnit, - val id: Int - ) : Comparable<VCpu> { - /** - * The current limit on the vCPU. - */ - var limit: Double = 0.0 - - /** - * The limit allocated by the hypervisor. - */ - var allocatedLimit: Double = 0.0 - - /** - * The current burst running on the vCPU. - */ - var burst: Long = 0L - - /** - * Consume the specified burst on this vCPU. - */ - fun consume(burst: Long): Boolean { - this.burst = max(0, this.burst - burst) - - // Flush the result to the slice if it exists - vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst) - - val actuallyExists = vm.activeSlice?.burst?.let { id < it.size } ?: false - return actuallyExists && this.burst == 0L - } - - /** - * Refresh the information of this vCPU based on the current slice. - */ - fun refresh() { - limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0 - burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0 - } - - /** - * Compare to another vCPU based on the current load of the vCPU. - */ - override fun compareTo(other: VCpu): Int { - return limit.compareTo(other.limit) - } - - /** - * Create a string representation of the vCPU. - */ - override fun toString(): String = - "vCPU(id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)" - } - - /** - * The execution context in which a VM runs. - * - */ - private inner class VmExecutionContext(val session: VmSession) : - SimExecutionContext, DisposableHandle { - override val machine: SimMachineModel - get() = session.model - - override val clock: Clock - get() = this@SimFairSharedHypervisor.clock - - @OptIn(InternalCoroutinesApi::class) - override fun onRun( - batch: Sequence<SimExecutionContext.Slice>, - triggerMode: SimExecutionContext.TriggerMode, - merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice - ): SelectClause0 = object : SelectClause0 { - @InternalCoroutinesApi - override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { - session.triggerMode = triggerMode - session.merge = merge - session.select = { - if (select.trySelect()) { - block.startCoroutineCancellable(select.completion) - } - } - session.schedule(batch) - // Indicate to the hypervisor that the VM should be re-scheduled - schedulingQueue.offer(SchedulerCommand.Schedule(session)) - select.disposeOnSelect(this@VmExecutionContext) - } - } - - override fun dispose() { - if (!session.isIdle) { - session.cancel() - schedulingQueue.offer(SchedulerCommand.Deschedule(session)) - } - } - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt index fb4cd137..872b5f72 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt @@ -26,14 +26,20 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload /** - * SimHypervisor distributes the computing requirements of multiple [SimWorkload] on a single [SimBareMetalMachine] i + * SimHypervisor distributes the computing requirements of multiple [SimWorkload] on a single [SimBareMetalMachine] * concurrently. */ public interface SimHypervisor : SimWorkload { /** + * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment. + */ + public fun canFit(model: SimMachineModel): Boolean + + /** * Create a [SimMachine] instance on which users may run a [SimWorkload]. * * @param model The machine to create. + * @param performanceInterferenceModel The performance interference model to use. */ public fun createMachine( model: SimMachineModel, @@ -49,10 +55,10 @@ public interface SimHypervisor : SimWorkload { */ public fun onSliceFinish( hypervisor: SimHypervisor, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, cpuUsage: Double, cpuDemand: Double ) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt index f66085af..ea8eeb37 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -22,15 +22,13 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.StateFlow import org.opendc.simulator.compute.workload.SimWorkload /** * A generic machine that is able to run a [SimWorkload]. */ -@OptIn(ExperimentalCoroutinesApi::class) -public interface SimMachine { +public interface SimMachine : AutoCloseable { /** * The model of the machine containing its specifications. */ @@ -45,4 +43,9 @@ public interface SimMachine { * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ public suspend fun run(workload: SimWorkload) + + /** + * Terminate this machine. + */ + public override fun close() } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index 0d2c9374..c22fcc07 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -23,37 +23,46 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimExecutionContext -import kotlin.math.min /** - * A [SimWorkload] that models applications performing a static number of floating point operations ([flops]) on - * a compute resource. + * A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on + * multiple cores of a compute resource. * * @property flops The number of floating point operations to perform for this task in MFLOPs. - * @property cores The number of cores that the image is able to utilize. * @property utilization A model of the CPU utilization of the application. */ public class SimFlopsWorkload( public val flops: Long, - public val cores: Int, public val utilization: Double = 0.8 ) : SimWorkload { init { - require(flops >= 0) { "Negative number of FLOPs" } - require(cores > 0) { "Negative number of cores or no cores" } + require(flops >= 0) { "Negative number of flops" } require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - /** - * Execute the runtime behavior based on a number of floating point operations to execute. - */ - override suspend fun run(ctx: SimExecutionContext) { - val cores = min(this.cores, ctx.machine.cpus.size) - val burst = LongArray(cores) { flops / cores } - val maxUsage = DoubleArray(cores) { i -> ctx.machine.cpus[i].frequency * utilization } + override fun onStart(ctx: SimExecutionContext) {} - ctx.run(SimExecutionContext.Slice(burst, maxUsage, Long.MAX_VALUE), triggerMode = SimExecutionContext.TriggerMode.LAST) + override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { + val cores = ctx.machine.cpus.size + val limit = ctx.machine.cpus[cpu].frequency * utilization + val work = flops.toDouble() / cores + + return if (work > 0.0) { + SimResourceCommand.Consume(work, limit) + } else { + SimResourceCommand.Exit + } + } + + override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { + return if (remainingWork > 0.0) { + val limit = ctx.machine.cpus[cpu].frequency * utilization + + return SimResourceCommand.Consume(remainingWork, limit) + } else { + SimResourceCommand.Exit + } } - override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,cores=$cores,utilization=$utilization)" + override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)" } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt new file mode 100644 index 00000000..41a5028e --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +/** + * A command that is sent to the host machine. + */ +public sealed class SimResourceCommand { + /** + * A request to the host to process the specified amount of [work] on a vCPU before the specified [deadline]. + * + * @param work The amount of work to process on the CPU. + * @param limit The maximum amount of work to be processed per second. + * @param deadline The instant at which the work needs to be fulfilled. + */ + public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() { + init { + require(work > 0) { "The amount of work must be positive." } + require(limit > 0) { "Limit must be positive." } + } + } + + /** + * An indication to the host that the vCPU will idle until the specified [deadline] or is interrupted. + */ + public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() + + /** + * An indication to the host that the vCPU has finished processing. + */ + public object Exit : SimResourceCommand() +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt new file mode 100644 index 00000000..00ebebce --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +import org.opendc.simulator.compute.SimExecutionContext + +/** + * A [SimWorkload] that models application execution as a single duration. + * + * @property duration The duration of the workload. + * @property utilization The utilization of the application during runtime. + */ +public class SimRuntimeWorkload( + public val duration: Long, + public val utilization: Double = 0.8 +) : SimWorkload { + init { + require(duration >= 0) { "Duration must be non-negative" } + require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } + } + + override fun onStart(ctx: SimExecutionContext) {} + + override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { + val limit = ctx.machine.cpus[cpu].frequency * utilization + val work = (limit / 1000) * duration + return SimResourceCommand.Consume(work, limit) + } + + override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { + return if (remainingWork > 0.0) { + val limit = ctx.machine.cpus[cpu].frequency * utilization + SimResourceCommand.Consume(remainingWork, limit) + } else { + SimResourceCommand.Exit + } + } + + override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)" +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 7b1ddf32..deb10b98 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -23,31 +23,64 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimExecutionContext -import kotlin.math.min /** * A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource * consumption for some period of time. */ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkload { - override suspend fun run(ctx: SimExecutionContext) { - var offset = ctx.clock.millis() - - val batch = trace.map { fragment -> - val cores = min(fragment.cores, ctx.machine.cpus.size) - val burst = LongArray(cores) { fragment.flops / cores } - val usage = DoubleArray(cores) { fragment.usage / cores } - offset += fragment.duration - SimExecutionContext.Slice(burst, usage, offset) + private var offset = 0L + private val iterator = trace.iterator() + private var fragment: Fragment? = null + private lateinit var barrier: SimWorkloadBarrier + + override fun onStart(ctx: SimExecutionContext) { + barrier = SimWorkloadBarrier(ctx.machine.cpus.size) + fragment = nextFragment() + offset = ctx.clock.millis() + } + + override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { + return onNext(ctx, cpu, 0.0) + } + + override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { + val now = ctx.clock.millis() + val fragment = fragment ?: return SimResourceCommand.Exit + val work = (fragment.duration / 1000) * fragment.usage + val deadline = offset + fragment.duration + + assert(deadline >= now) { "Deadline already passed" } + + val cmd = + if (cpu < fragment.cores && work > 0.0) + SimResourceCommand.Consume(work, fragment.usage, deadline) + else + SimResourceCommand.Idle(deadline) + + if (barrier.enter()) { + this.fragment = nextFragment() + this.offset += fragment.duration } - ctx.run(batch) + return cmd } override fun toString(): String = "SimTraceWorkload" /** + * Obtain the next fragment. + */ + private fun nextFragment(): Fragment? { + return if (iterator.hasNext()) { + iterator.next() + } else { + null + } + } + + /** * A fragment of the workload. */ - public data class Fragment(val time: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int) + public data class Fragment(val duration: Long, val usage: Double, val cores: Int) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index 2add8cce..6fc78d56 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -28,14 +28,31 @@ import org.opendc.simulator.compute.SimExecutionContext * A model that characterizes the runtime behavior of some particular workload. * * Workloads are stateful objects that may be paused and resumed at a later moment. As such, be careful when using the - * same [SimWorkload] from multiple contexts as only a single concurrent [run] call is expected. + * same [SimWorkload] from multiple contexts. */ public interface SimWorkload { /** - * Launch the workload in the specified [SimExecutionContext]. + * This method is invoked when the workload is started, before the (virtual) CPUs assigned to the workload will + * start. + */ + public fun onStart(ctx: SimExecutionContext) + + /** + * This method is invoked when a (virtual) CPU assigned to the workload has started. + * + * @param ctx The execution context in which the workload runs. + * @param cpu The index of the (virtual) CPU to start. + * @return The command to perform on the CPU. + */ + public fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand + + /** + * This method is invoked when a (virtual) CPU assigned to the workload was interrupted or reached its deadline. * - * This method should encapsulate and characterize the runtime behavior of the instance resulting from launching - * the workload on some machine, in terms of the resource consumption on the machine. + * @param ctx The execution context in which the workload runs. + * @param cpu The index of the (virtual) CPU to obtain the resource consumption of. + * @param remainingWork The remaining work that was not yet completed. + * @return The next command to perform on the CPU. */ - public suspend fun run(ctx: SimExecutionContext) + public fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt new file mode 100644 index 00000000..45a299be --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +/** + * The [SimWorkloadBarrier] is a barrier that allows workloads to wait for a select number of CPUs to complete, before + * proceeding its operation. + */ +public class SimWorkloadBarrier(public val parties: Int) { + private var counter = 0 + + /** + * Enter the barrier and determine whether the caller is the last to reach the barrier. + * + * @return `true` if the caller is the last to reach the barrier, `false` otherwise. + */ + public fun enter(): Boolean { + val last = ++counter == parties + if (last) { + counter = 0 + return true + } + return false + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index e7fdd4b2..b8eee4f0 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -26,7 +26,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestCoroutineScope import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll @@ -51,7 +51,7 @@ internal class SimHypervisorTest { scope = TestCoroutineScope() clock = DelayControllerClockAdapter(scope) - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) machineModel = SimMachineModel( cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } @@ -59,27 +59,27 @@ internal class SimHypervisorTest { } /** - * Test overcommissioning of a hypervisor. + * Test overcommitting of resources via the hypervisor with a single VM. */ @Test - fun overcommission() { + fun testOvercommittedSingle() { val listener = object : SimHypervisor.Listener { - var totalRequestedBurst = 0L - var totalGrantedBurst = 0L - var totalOvercommissionedBurst = 0L + var totalRequestedWork = 0L + var totalGrantedWork = 0L + var totalOvercommittedWork = 0L override fun onSliceFinish( hypervisor: SimHypervisor, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, cpuUsage: Double, cpuDemand: Double ) { - totalRequestedBurst += requestedBurst - totalGrantedBurst += grantedBurst - totalOvercommissionedBurst += overcommissionedBurst + totalRequestedWork += requestedWork + totalGrantedWork += grantedWork + totalOvercommittedWork += overcommittedWork } } @@ -88,24 +88,84 @@ internal class SimHypervisorTest { val workloadA = SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(0, 28L * duration, duration * 1000, 28.0, 2), - SimTraceWorkload.Fragment(0, 3500L * duration, duration * 1000, 3500.0, 2), - SimTraceWorkload.Fragment(0, 0, duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(0, 183L * duration, duration * 1000, 183.0, 2) + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + ), + ) + + val machine = SimBareMetalMachine(scope, clock, machineModel) + val hypervisor = SimFairShareHypervisor(listener) + + launch { + machine.run(hypervisor) + } + + yield() + launch { hypervisor.createMachine(machineModel).run(workloadA) } + } + + scope.advanceUntilIdle() + scope.uncaughtExceptions.forEach { it.printStackTrace() } + + assertAll( + { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, + { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") }, + { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, + { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(1200000, scope.currentTime) } + ) + } + + /** + * Test overcommitting of resources via the hypervisor with two VMs. + */ + @Test + fun testOvercommittedDual() { + val listener = object : SimHypervisor.Listener { + var totalRequestedWork = 0L + var totalGrantedWork = 0L + var totalOvercommittedWork = 0L + + override fun onSliceFinish( + hypervisor: SimHypervisor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + totalRequestedWork += requestedWork + totalGrantedWork += grantedWork + totalOvercommittedWork += overcommittedWork + } + } + + scope.launch { + val duration = 5 * 60L + val workloadA = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) ), ) val workloadB = SimTraceWorkload( sequenceOf( - SimTraceWorkload.Fragment(0, 28L * duration, duration * 1000, 28.0, 2), - SimTraceWorkload.Fragment(0, 3100L * duration, duration * 1000, 3100.0, 2), - SimTraceWorkload.Fragment(0, 0, duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(0, 73L * duration, duration * 1000, 73.0, 2) + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 73.0, 1) ) ) val machine = SimBareMetalMachine(scope, clock, machineModel) - val hypervisor = SimFairSharedHypervisor(scope, clock, listener) + val hypervisor = SimFairShareHypervisor(listener) launch { machine.run(hypervisor) @@ -117,13 +177,14 @@ internal class SimHypervisorTest { } scope.advanceUntilIdle() + scope.uncaughtExceptions.forEach { it.printStackTrace() } assertAll( - { Assertions.assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, - { Assertions.assertEquals(2082000, listener.totalRequestedBurst, "Requested Burst does not match") }, - { Assertions.assertEquals(2013600, listener.totalGrantedBurst, "Granted Burst does not match") }, - { Assertions.assertEquals(60000, listener.totalOvercommissionedBurst, "Overcommissioned Burst does not match") }, - { Assertions.assertEquals(1200001, scope.currentTime) } + { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, + { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, + { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") }, + { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(1200000, scope.currentTime) } ) } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 332ca8e9..1036f1ac 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -23,15 +23,20 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.TestCoroutineScope import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.simulator.compute.workload.SimResourceCommand +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter /** @@ -58,7 +63,7 @@ class SimMachineTest { val machine = SimBareMetalMachine(testScope, clock, machineModel) testScope.runBlockingTest { - machine.run(SimFlopsWorkload(2_000, 2, utilization = 1.0)) + machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) // Two cores execute 1000 MFlOps per second (1000 ms) assertEquals(1000, testScope.currentTime) @@ -72,12 +77,83 @@ class SimMachineTest { val machine = SimBareMetalMachine(testScope, clock, machineModel) testScope.runBlockingTest { - machine.run(SimFlopsWorkload(2_000, 2, utilization = 1.0)) - assertEquals(1.0, machine.usage.value) + val res = mutableListOf<Double>() + val job = launch { machine.usage.toList(res) } - // Wait for the usage to reset - delay(1) - assertEquals(0.0, machine.usage.value) + machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) + + job.cancel() + assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" } + } + } + + @Test + fun testInterrupt() { + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) + val machine = SimBareMetalMachine(testScope, clock, machineModel) + + val workload = object : SimWorkload { + override fun onStart(ctx: SimExecutionContext) {} + + override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { + ctx.interrupt(cpu) + return SimResourceCommand.Exit + } + + override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { + throw IllegalStateException() + } + } + + assertDoesNotThrow { + testScope.runBlockingTest { machine.run(workload) } + } + } + + @Test + fun testExceptionPropagationOnStart() { + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) + val machine = SimBareMetalMachine(testScope, clock, machineModel) + + val workload = object : SimWorkload { + override fun onStart(ctx: SimExecutionContext) {} + + override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { + throw IllegalStateException() + } + + override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { + throw IllegalStateException() + } + } + + assertThrows<IllegalStateException> { + testScope.runBlockingTest { machine.run(workload) } + } + } + + @Test + fun testExceptionPropagationOnNext() { + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) + val machine = SimBareMetalMachine(testScope, clock, machineModel) + + val workload = object : SimWorkload { + override fun onStart(ctx: SimExecutionContext) {} + + override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { + return SimResourceCommand.Consume(1.0, 1.0) + } + + override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { + throw IllegalStateException() + } + } + + assertThrows<IllegalStateException> { + testScope.runBlockingTest { machine.run(workload) } } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkloadTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkloadTest.kt index 51bed76c..b3e57453 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkloadTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkloadTest.kt @@ -32,42 +32,28 @@ class SimFlopsWorkloadTest { @Test fun testFlopsNonNegative() { assertThrows<IllegalArgumentException>("FLOPs must be non-negative") { - SimFlopsWorkload(-1, 1) - } - } - - @Test - fun testCoresNonZero() { - assertThrows<IllegalArgumentException>("Cores cannot be zero") { - SimFlopsWorkload(1, 0) - } - } - - @Test - fun testCoresPositive() { - assertThrows<IllegalArgumentException>("Cores cannot be negative") { - SimFlopsWorkload(1, -1) + SimFlopsWorkload(-1) } } @Test fun testUtilizationNonZero() { assertThrows<IllegalArgumentException>("Utilization cannot be zero") { - SimFlopsWorkload(1, 1, 0.0) + SimFlopsWorkload(1, 0.0) } } @Test fun testUtilizationPositive() { assertThrows<IllegalArgumentException>("Utilization cannot be negative") { - SimFlopsWorkload(1, 1, -1.0) + SimFlopsWorkload(1, -1.0) } } @Test fun testUtilizationNotLargerThanOne() { assertThrows<IllegalArgumentException>("Utilization cannot be larger than one") { - SimFlopsWorkload(1, 1, 2.0) + SimFlopsWorkload(1, 2.0) } } } |
