summaryrefslogtreecommitdiff
path: root/simulator/opendc-simulator/opendc-simulator-compute
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-10-03 16:29:55 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-10-03 16:29:55 +0200
commitc8567a567348e13c341bf1a1ec64ed34ce25815a (patch)
treee3818756de1c14846b57bef27adc3cef51a72279 /simulator/opendc-simulator/opendc-simulator-compute
parentac7016c4c5f15bf20b21e7d34e93d8b963aab231 (diff)
Implement VirtDriver using opendc-simulator-compute module
This change adds an implementation of the VirtDriver interface that uses the functionality provided by the opendc-simulator-compute module.
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-compute')
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt281
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt529
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt236
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt3
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt130
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt6
6 files changed, 955 insertions, 230 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
new file mode 100644
index 00000000..c6d5bdd1
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -0,0 +1,281 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.StateFlow
+import kotlinx.coroutines.intrinsics.startCoroutineCancellable
+import kotlinx.coroutines.selects.SelectClause0
+import kotlinx.coroutines.selects.SelectInstance
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.lang.Runnable
+import java.time.Clock
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A simulated bare-metal machine that is able to run a single workload.
+ *
+ * A [SimBareMetalMachine] is a stateful object and you should be careful when operating this object concurrently. For
+ * example. the class expects only a single concurrent call to [run].
+ *
+ * @param coroutineScope The [CoroutineScope] to run the simulated workload in.
+ * @param clock The virtual clock to track the simulation time.
+ * @param model The machine model to simulate.
+ */
+@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
+public class SimBareMetalMachine(
+ private val coroutineScope: CoroutineScope,
+ private val clock: Clock,
+ override val model: SimMachineModel
+) : SimMachine {
+ /**
+ * A [StateFlow] representing the CPU usage of the simulated machine.
+ */
+ override val usage: StateFlow<Double>
+ get() = usageState
+
+ /**
+ * The current active workload.
+ */
+ private var activeWorkload: SimWorkload? = null
+
+ /**
+ * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ */
+ override suspend fun run(workload: SimWorkload) {
+ require(activeWorkload == null) { "Run should not be called concurrently" }
+
+ try {
+ activeWorkload = workload
+ workload.run(ctx)
+ } finally {
+ activeWorkload = null
+ }
+ }
+
+ /**
+ * The execution context in which the workload runs.
+ */
+ private val ctx = object : SimExecutionContext {
+ override val machine: SimMachineModel
+ get() = this@SimBareMetalMachine.model
+
+ override val clock: Clock
+ get() = this@SimBareMetalMachine.clock
+
+ override fun onRun(
+ batch: Sequence<SimExecutionContext.Slice>,
+ triggerMode: SimExecutionContext.TriggerMode,
+ merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice
+ ): SelectClause0 {
+ return object : SelectClause0 {
+ @InternalCoroutinesApi
+ override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
+ // Do not reset the usage state: we will set it ourselves
+ usageFlush?.dispose()
+ usageFlush = null
+
+ val queue = batch.iterator()
+ var start = Long.MIN_VALUE
+ var currentWork: SliceWork? = null
+ var currentDisposable: DisposableHandle? = null
+
+ fun schedule(slice: SimExecutionContext.Slice) {
+ start = clock.millis()
+
+ val isLastSlice = !queue.hasNext()
+ val work = SliceWork(slice)
+ val candidateDuration = when (triggerMode) {
+ SimExecutionContext.TriggerMode.FIRST -> work.minExit
+ SimExecutionContext.TriggerMode.LAST -> work.maxExit
+ SimExecutionContext.TriggerMode.DEADLINE -> slice.deadline - start
+ }
+
+ // Check whether the deadline is exceeded during the run of the slice.
+ val duration = min(candidateDuration, slice.deadline - start)
+
+ val action = Runnable {
+ currentWork = null
+
+ // Flush all the work that was performed
+ val hasFinished = work.stop(duration)
+
+ if (!isLastSlice) {
+ val candidateSlice = queue.next()
+ val nextSlice =
+ // If our previous slice exceeds its deadline, merge it with the next candidate slice
+ if (hasFinished)
+ candidateSlice
+ else
+ merge(candidateSlice, slice)
+ schedule(nextSlice)
+ } else if (select.trySelect()) {
+ block.startCoroutineCancellable(select.completion)
+ }
+ }
+
+ // Schedule the flush after the entire slice has finished
+ currentDisposable = delay.invokeOnTimeout(duration, action)
+
+ // Start the slice work
+ currentWork = work
+ work.start()
+ }
+
+ // Schedule the first work
+ if (queue.hasNext()) {
+ schedule(queue.next())
+
+ // A DisposableHandle to flush the work in case the call is cancelled
+ val disposable = DisposableHandle {
+ val end = clock.millis()
+ val duration = end - start
+
+ currentWork?.stop(duration)
+ currentDisposable?.dispose()
+
+ // Schedule reset the usage of the machine since the call is returning
+ usageFlush = delay.invokeOnTimeout(1) {
+ usageState.value = 0.0
+ usageFlush = null
+ }
+ }
+
+ select.disposeOnSelect(disposable)
+ } else if (select.trySelect()) {
+ // No work has been given: select immediately
+ block.startCoroutineCancellable(select.completion)
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * The [MutableStateFlow] containing the load of the server.
+ */
+ private val usageState = MutableStateFlow(0.0)
+
+ /**
+ * A disposable to prevent resetting the usage state for subsequent calls to onRun.
+ */
+ private var usageFlush: DisposableHandle? = null
+
+ /**
+ * Cache the [Delay] instance for timing.
+ *
+ * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy.
+ * XXX Note however that this is an ugly hack which may break in the future.
+ */
+ @OptIn(InternalCoroutinesApi::class)
+ private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay
+
+ /**
+ * A slice to be processed.
+ */
+ private inner class SliceWork(val slice: SimExecutionContext.Slice) {
+ /**
+ * The duration after which the first processor finishes processing this slice.
+ */
+ val minExit: Long
+
+ /**
+ * The duration after which the last processor finishes processing this slice.
+ */
+ val maxExit: Long
+
+ /**
+ * A flag to indicate that the slice will exceed the deadline.
+ */
+ val exceedsDeadline: Boolean
+ get() = slice.deadline < maxExit
+
+ /**
+ * The total amount of CPU usage.
+ */
+ val totalUsage: Double
+
+ /**
+ * A flag to indicate that this slice is empty.
+ */
+ val isEmpty: Boolean
+
+ init {
+ var totalUsage = 0.0
+ var minExit = Long.MAX_VALUE
+ var maxExit = 0L
+ var nonEmpty = false
+
+ // Determine the duration of the first/last CPU to finish
+ for (i in 0 until min(model.cpus.size, slice.burst.size)) {
+ val cpu = model.cpus[i]
+ val usage = min(slice.limit[i], cpu.frequency)
+ val cpuDuration = ceil(slice.burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds
+
+ totalUsage += usage / cpu.frequency
+
+ if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst
+ minExit = min(minExit, cpuDuration)
+ maxExit = max(maxExit, cpuDuration)
+ nonEmpty = true
+ }
+ }
+
+ this.isEmpty = !nonEmpty
+ this.totalUsage = totalUsage
+ this.minExit = minExit
+ this.maxExit = maxExit
+ }
+
+ /**
+ * Indicate that the work on the slice has started.
+ */
+ fun start() {
+ usageState.value = totalUsage / model.cpus.size
+ }
+
+ /**
+ * Flush the work performed on the slice.
+ */
+ fun stop(duration: Long): Boolean {
+ var hasFinished = true
+
+ for (i in 0 until min(model.cpus.size, slice.burst.size)) {
+ val usage = min(slice.limit[i], model.cpus[i].frequency)
+ val granted = ceil(duration / 1000.0 * usage).toLong()
+ val res = max(0, slice.burst[i] - granted)
+ slice.burst[i] = res
+
+ if (res != 0L) {
+ hasFinished = false
+ }
+ }
+
+ return hasFinished
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt
new file mode 100644
index 00000000..7c2cfbe3
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt
@@ -0,0 +1,529 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.StateFlow
+import kotlinx.coroutines.intrinsics.startCoroutineCancellable
+import kotlinx.coroutines.selects.SelectClause0
+import kotlinx.coroutines.selects.SelectInstance
+import kotlinx.coroutines.selects.select
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.time.Clock
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * SimHypervisor distributes the computing requirements of multiple [SimWorkload] on a single [SimBareMetalMachine] concurrently.
+ *
+ * @param coroutineScope The [CoroutineScope] to run the simulated workloads in.
+ * @param clock The virtual clock to track the simulation time.
+ */
+@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
+public class SimHypervisor(
+ private val coroutineScope: CoroutineScope,
+ private val clock: Clock,
+ private val listener: Listener? = null
+) : SimWorkload {
+ /**
+ * A set for tracking the VM context objects.
+ */
+ private val vms: MutableSet<VmExecutionContext> = mutableSetOf()
+
+ /**
+ * A flag to indicate the driver is stopped.
+ */
+ private var stopped: Boolean = false
+
+ /**
+ * The channel for scheduling new CPU requests.
+ */
+ private val schedulingQueue = Channel<SchedulerCommand>(Channel.UNLIMITED)
+
+ /**
+ * Create a [SimMachine] instance on which users may run a [SimWorkload].
+ *
+ * @param model The machine to create.
+ */
+ public fun createMachine(model: SimMachineModel): SimMachine {
+ val vmCtx = VmExecutionContext(model)
+
+ return object : SimMachine {
+ override val model: SimMachineModel
+ get() = vmCtx.machine
+
+ override val usage: StateFlow<Double>
+ get() = vmCtx.session.usage
+
+ /**
+ * The current active workload.
+ */
+ private var activeWorkload: SimWorkload? = null
+
+ override suspend fun run(workload: SimWorkload) {
+ require(activeWorkload == null) { "Run should not be called concurrently" }
+
+ try {
+ activeWorkload = workload
+ workload.run(vmCtx)
+ } finally {
+ activeWorkload = null
+ }
+ }
+
+ override fun toString(): String = "SimVirtualMachine"
+ }
+ }
+
+ /**
+ * Run the scheduling process of the hypervisor.
+ */
+ override suspend fun run(ctx: SimExecutionContext) {
+ val maxUsage = ctx.machine.cpus.sumByDouble { it.frequency }
+ val pCPUs = ctx.machine.cpus.indices.sortedBy { ctx.machine.cpus[it].frequency }
+
+ val vms = mutableSetOf<VmSession>()
+ val vcpus = mutableListOf<VCpu>()
+
+ val usage = DoubleArray(ctx.machine.cpus.size)
+ val burst = LongArray(ctx.machine.cpus.size)
+
+ fun process(command: SchedulerCommand) {
+ when (command) {
+ is SchedulerCommand.Schedule -> {
+ vms += command.vm
+ vcpus.addAll(command.vm.vcpus)
+ }
+ is SchedulerCommand.Deschedule -> {
+ vms -= command.vm
+ vcpus.removeAll(command.vm.vcpus)
+ }
+ is SchedulerCommand.Interrupt -> {
+ }
+ }
+ }
+
+ fun processRemaining() {
+ var command = schedulingQueue.poll()
+ while (command != null) {
+ process(command)
+ command = schedulingQueue.poll()
+ }
+ }
+
+ while (!stopped) {
+ // Wait for a request to be submitted if we have no work yet.
+ if (vcpus.isEmpty()) {
+ process(schedulingQueue.receive())
+ }
+
+ processRemaining()
+
+ val start = clock.millis()
+
+ var duration: Double = Double.POSITIVE_INFINITY
+ var deadline: Long = Long.MAX_VALUE
+ var availableUsage = maxUsage
+ var totalRequestedUsage = 0.0
+ var totalRequestedBurst = 0L
+
+ // Sort the vCPUs based on their requested usage
+ // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
+ vcpus.sort()
+
+ // Divide the available host capacity fairly across the vCPUs using max-min fair sharing
+ for ((i, req) in vcpus.withIndex()) {
+ val remaining = vcpus.size - i
+ val availableShare = availableUsage / remaining
+ val grantedUsage = min(req.limit, availableShare)
+
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, req.vm.deadline)
+
+ // Ignore empty CPUs
+ if (grantedUsage <= 0 || req.burst <= 0) {
+ req.allocatedLimit = 0.0
+ continue
+ }
+
+ totalRequestedUsage += req.limit
+ totalRequestedBurst += req.burst
+
+ req.allocatedLimit = grantedUsage
+ availableUsage -= grantedUsage
+
+ // The duration that we want to run is that of the shortest request from a vCPU
+ duration = min(duration, req.burst / grantedUsage)
+ }
+
+ val totalAllocatedUsage = maxUsage - availableUsage
+ var totalAllocatedBurst = 0L
+ availableUsage = totalAllocatedUsage
+
+ // Divide the requests over the available capacity of the pCPUs fairly
+ for (i in pCPUs) {
+ val maxCpuUsage = ctx.machine.cpus[i].frequency
+ val fraction = maxCpuUsage / maxUsage
+ val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction)
+ val grantedBurst = ceil(duration * grantedUsage).toLong()
+
+ usage[i] = grantedUsage
+ burst[i] = grantedBurst
+ totalAllocatedBurst += grantedBurst
+ availableUsage -= grantedUsage
+ }
+
+ // We run the total burst on the host processor. Note that this call may be cancelled at any moment in
+ // time, so not all of the burst may be executed.
+ select<Boolean> {
+ schedulingQueue.onReceive { schedulingQueue.offer(it); true }
+ ctx.onRun(SimExecutionContext.Slice(burst, usage, deadline), SimExecutionContext.TriggerMode.DEADLINE)
+ .invoke { false }
+ }
+
+ val end = clock.millis()
+
+ // No work was performed
+ if ((end - start) <= 0) {
+ continue
+ }
+
+ // The total requested burst that the VMs wanted to run in the time-frame that we ran.
+ val totalRequestedSubBurst =
+ vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum()
+ val totalRemainder = burst.sum()
+ val totalGrantedBurst = totalAllocatedBurst - totalRemainder
+
+ // The burst that was lost due to overcommissioning of CPU resources
+ var totalOvercommissionedBurst = 0L
+ // The burst that was lost due to interference.
+ var totalInterferedBurst = 0L
+
+ val vmIterator = vms.iterator()
+ while (vmIterator.hasNext()) {
+ val vm = vmIterator.next()
+
+ // Apply performance interference model
+ val performanceScore = 1.0 // TODO Performance interference
+ var hasFinished = false
+
+ for (vcpu in vm.vcpus) {
+ // Compute the fraction of compute time allocated to the VM
+ val fraction = vcpu.allocatedLimit / totalAllocatedUsage
+
+ // Compute the burst time that the VM was actually granted
+ val grantedBurst = ceil(totalGrantedBurst * fraction).toLong()
+
+ // The burst that was actually used by the VM
+ val usedBurst = ceil(grantedBurst * performanceScore).toLong()
+
+ totalInterferedBurst += grantedBurst - usedBurst
+
+ // Compute remaining burst time to be executed for the request
+ if (vcpu.consume(usedBurst)) {
+ hasFinished = true
+ } else if (vm.deadline <= end) {
+ // Request must have its entire burst consumed or otherwise we have overcommission
+ // Note that we count the overcommissioned burst if the hypervisor has failed.
+ totalOvercommissionedBurst += vcpu.burst
+ }
+ }
+
+ if (hasFinished || vm.deadline <= end) {
+ // Mark the VM as finished and deschedule the VMs if needed
+ if (vm.finish()) {
+ vmIterator.remove()
+ vcpus.removeAll(vm.vcpus)
+ }
+ }
+ }
+
+ listener?.onSliceFinish(
+ this,
+ totalRequestedBurst,
+ min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing
+ totalOvercommissionedBurst,
+ totalInterferedBurst, // Might be smaller than zero due to FP rounding errors,
+ min(
+ totalAllocatedUsage,
+ totalRequestedUsage
+ ), // The allocated usage might be slightly higher due to FP rounding
+ totalRequestedUsage
+ )
+ }
+ }
+
+ /**
+ * Event listener for hypervisor events.
+ */
+ public interface Listener {
+ /**
+ * This method is invoked when a slice is finished.
+ */
+ public fun onSliceFinish(
+ hypervisor: SimHypervisor,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ )
+ }
+
+ /**
+ * A scheduling command processed by the scheduler.
+ */
+ private sealed class SchedulerCommand {
+ /**
+ * Schedule the specified VM on the hypervisor.
+ */
+ data class Schedule(val vm: VmSession) : SchedulerCommand()
+
+ /**
+ * De-schedule the specified VM on the hypervisor.
+ */
+ data class Deschedule(val vm: VmSession) : SchedulerCommand()
+
+ /**
+ * Interrupt the scheduler.
+ */
+ object Interrupt : SchedulerCommand()
+ }
+
+ /**
+ * A virtual machine running on the hypervisor.
+ *
+ * @param ctx The execution context the vCPU runs in.
+ * @param triggerMode The mode when to trigger the VM exit.
+ * @param merge The function to merge consecutive slices on spillover.
+ * @param select The function to select on finish.
+ */
+ @OptIn(InternalCoroutinesApi::class)
+ private data class VmSession(
+ val model: SimMachineModel,
+ var triggerMode: SimExecutionContext.TriggerMode = SimExecutionContext.TriggerMode.FIRST,
+ var merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice = { _, r -> r },
+ var select: () -> Unit = {}
+ ) {
+ /**
+ * The vCPUs of this virtual machine.
+ */
+ val vcpus: List<VCpu>
+
+ /**
+ * The slices that the VM wants to run.
+ */
+ var queue: Iterator<SimExecutionContext.Slice> = emptyList<SimExecutionContext.Slice>().iterator()
+
+ /**
+ * The current active slice.
+ */
+ var activeSlice: SimExecutionContext.Slice? = null
+
+ /**
+ * The current deadline of the VM.
+ */
+ val deadline: Long
+ get() = activeSlice?.deadline ?: Long.MAX_VALUE
+
+ /**
+ * A flag to indicate that the VM is idle.
+ */
+ val isIdle: Boolean
+ get() = activeSlice == null
+
+ /**
+ * The usage of the virtual machine.
+ */
+ val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
+
+ init {
+ vcpus = model.cpus.mapIndexed { i, model -> VCpu(this, model, i) }
+ }
+
+ /**
+ * Schedule the given slices on this vCPU, replacing the existing slices.
+ */
+ fun schedule(slices: Sequence<SimExecutionContext.Slice>) {
+ queue = slices.iterator()
+
+ if (queue.hasNext()) {
+ activeSlice = queue.next()
+ refresh()
+ }
+ }
+
+ /**
+ * Cancel the existing workload on the VM.
+ */
+ fun cancel() {
+ queue = emptyList<SimExecutionContext.Slice>().iterator()
+ activeSlice = null
+ refresh()
+ }
+
+ /**
+ * Finish the current slice of the VM.
+ *
+ * @return `true` if the vCPUs may be descheduled, `false` otherwise.
+ */
+ fun finish(): Boolean {
+ val activeSlice = activeSlice ?: return true
+
+ return if (queue.hasNext()) {
+ val needsMerge = activeSlice.burst.any { it > 0 }
+ val candidateSlice = queue.next()
+ val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice
+
+ this.activeSlice = slice
+
+ // Update the vCPU cache
+ refresh()
+
+ false
+ } else {
+ this.activeSlice = null
+ select()
+ true
+ }
+ }
+
+ /**
+ * Refresh the vCPU cache.
+ */
+ fun refresh() {
+ vcpus.forEach { it.refresh() }
+ usage.value = vcpus.sumByDouble { it.burst / it.limit } / vcpus.size
+ }
+ }
+
+ /**
+ * A virtual CPU that can be scheduled on a physical CPU.
+ *
+ * @param vm The VM of which this vCPU is part.
+ * @param model The model of CPU that this vCPU models.
+ * @param id The id of the vCPU with respect to the VM.
+ */
+ private data class VCpu(
+ val vm: VmSession,
+ val model: ProcessingUnit,
+ val id: Int
+ ) : Comparable<VCpu> {
+ /**
+ * The current limit on the vCPU.
+ */
+ var limit: Double = 0.0
+
+ /**
+ * The limit allocated by the hypervisor.
+ */
+ var allocatedLimit: Double = 0.0
+
+ /**
+ * The current burst running on the vCPU.
+ */
+ var burst: Long = 0L
+
+ /**
+ * Consume the specified burst on this vCPU.
+ */
+ fun consume(burst: Long): Boolean {
+ this.burst = max(0, this.burst - burst)
+
+ // Flush the result to the slice if it exists
+ vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst)
+
+ return allocatedLimit > 0.0 && this.burst == 0L
+ }
+
+ /**
+ * Refresh the information of this vCPU based on the current slice.
+ */
+ fun refresh() {
+ limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0
+ burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0
+ }
+
+ /**
+ * Compare to another vCPU based on the current load of the vCPU.
+ */
+ override fun compareTo(other: VCpu): Int {
+ return limit.compareTo(other.limit)
+ }
+
+ /**
+ * Create a string representation of the vCPU.
+ */
+ override fun toString(): String =
+ "vCPU(id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)"
+ }
+
+ /**
+ * The execution context in which a VM runs.
+ *
+ */
+ private inner class VmExecutionContext(override val machine: SimMachineModel) :
+ SimExecutionContext,
+ DisposableHandle {
+ private var finalized: Boolean = false
+ private var initialized: Boolean = false
+ val session: VmSession = VmSession(machine)
+
+ override val clock: Clock
+ get() = this@SimHypervisor.clock
+
+ @OptIn(InternalCoroutinesApi::class)
+ override fun onRun(
+ batch: Sequence<SimExecutionContext.Slice>,
+ triggerMode: SimExecutionContext.TriggerMode,
+ merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice
+ ): SelectClause0 = object : SelectClause0 {
+ @InternalCoroutinesApi
+ override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
+ session.triggerMode = triggerMode
+ session.merge = merge
+ session.select = {
+ if (select.trySelect()) {
+ block.startCoroutineCancellable(select.completion)
+ }
+ }
+ session.schedule(batch)
+ // Indicate to the hypervisor that the VM should be re-scheduled
+ schedulingQueue.offer(SchedulerCommand.Schedule(session))
+ select.disposeOnSelect(this@VmExecutionContext)
+ }
+ }
+
+ override fun dispose() {
+ if (!session.isIdle) {
+ session.cancel()
+ schedulingQueue.offer(SchedulerCommand.Deschedule(session))
+ }
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
index df74a5f1..f66085af 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
@@ -22,245 +22,27 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.StateFlow
-import kotlinx.coroutines.intrinsics.startCoroutineCancellable
-import kotlinx.coroutines.selects.SelectClause0
-import kotlinx.coroutines.selects.SelectInstance
import org.opendc.simulator.compute.workload.SimWorkload
-import java.lang.Runnable
-import java.time.Clock
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
/**
- * A simulated bare-metal machine that is able to run a single workload.
- *
- * @param coroutineScope The [CoroutineScope] to run the simulated workload in.
- * @param clock The virtual clock to track the simulation time.
- * @param model The machine model to simulate.
+ * A generic machine that is able to run a [SimWorkload].
*/
-@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
-public class SimMachine(
- private val coroutineScope: CoroutineScope,
- private val clock: Clock,
+@OptIn(ExperimentalCoroutinesApi::class)
+public interface SimMachine {
+ /**
+ * The model of the machine containing its specifications.
+ */
public val model: SimMachineModel
-) {
+
/**
* A [StateFlow] representing the CPU usage of the simulated machine.
*/
public val usage: StateFlow<Double>
- get() = usageState
/**
* Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
- public suspend fun run(workload: SimWorkload) {
- workload.run(ctx)
- }
-
- /**
- * The execution context in which the workload runs.
- */
- private val ctx = object : SimExecutionContext {
- override val machine: SimMachineModel
- get() = this@SimMachine.model
-
- override val clock: Clock
- get() = this@SimMachine.clock
-
- override fun onRun(
- batch: Sequence<SimExecutionContext.Slice>,
- triggerMode: SimExecutionContext.TriggerMode,
- merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice
- ): SelectClause0 {
- return object : SelectClause0 {
- @InternalCoroutinesApi
- override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
- // Do not reset the usage state: we will set it ourselves
- usageFlush?.dispose()
- usageFlush = null
-
- val queue = batch.iterator()
- var start = Long.MIN_VALUE
- var currentWork: SliceWork? = null
- var currentDisposable: DisposableHandle? = null
-
- fun schedule(slice: SimExecutionContext.Slice) {
- start = clock.millis()
-
- val isLastSlice = !queue.hasNext()
- val work = SliceWork(slice)
- val candidateDuration = when (triggerMode) {
- SimExecutionContext.TriggerMode.FIRST -> work.minExit
- SimExecutionContext.TriggerMode.LAST -> work.maxExit
- SimExecutionContext.TriggerMode.DEADLINE -> slice.deadline - start
- }
-
- // Check whether the deadline is exceeded during the run of the slice.
- val duration = min(candidateDuration, slice.deadline - start)
-
- val action = Runnable {
- currentWork = null
-
- // Flush all the work that was performed
- val hasFinished = work.stop(duration)
-
- if (!isLastSlice) {
- val candidateSlice = queue.next()
- val nextSlice =
- // If our previous slice exceeds its deadline, merge it with the next candidate slice
- if (hasFinished)
- candidateSlice
- else
- merge(candidateSlice, slice)
- schedule(nextSlice)
- } else if (select.trySelect()) {
- block.startCoroutineCancellable(select.completion)
- }
- }
-
- // Schedule the flush after the entire slice has finished
- currentDisposable = delay.invokeOnTimeout(duration, action)
-
- // Start the slice work
- currentWork = work
- work.start()
- }
-
- // Schedule the first work
- if (queue.hasNext()) {
- schedule(queue.next())
-
- // A DisposableHandle to flush the work in case the call is cancelled
- val disposable = DisposableHandle {
- val end = clock.millis()
- val duration = end - start
-
- currentWork?.stop(duration)
- currentDisposable?.dispose()
-
- // Schedule reset the usage of the machine since the call is returning
- usageFlush = delay.invokeOnTimeout(1) {
- usageState.value = 0.0
- usageFlush = null
- }
- }
-
- select.disposeOnSelect(disposable)
- } else if (select.trySelect()) {
- // No work has been given: select immediately
- block.startCoroutineCancellable(select.completion)
- }
- }
- }
- }
- }
-
- /**
- * The [MutableStateFlow] containing the load of the server.
- */
- private val usageState = MutableStateFlow(0.0)
-
- /**
- * A disposable to prevent resetting the usage state for subsequent calls to onRun.
- */
- private var usageFlush: DisposableHandle? = null
-
- /**
- * Cache the [Delay] instance for timing.
- *
- * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy.
- * XXX Note however that this is an ugly hack which may break in the future.
- */
- @OptIn(InternalCoroutinesApi::class)
- private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay
-
- /**
- * A slice to be processed.
- */
- private inner class SliceWork(val slice: SimExecutionContext.Slice) {
- /**
- * The duration after which the first processor finishes processing this slice.
- */
- val minExit: Long
-
- /**
- * The duration after which the last processor finishes processing this slice.
- */
- val maxExit: Long
-
- /**
- * A flag to indicate that the slice will exceed the deadline.
- */
- val exceedsDeadline: Boolean
- get() = slice.deadline < maxExit
-
- /**
- * The total amount of CPU usage.
- */
- val totalUsage: Double
-
- /**
- * A flag to indicate that this slice is empty.
- */
- val isEmpty: Boolean
-
- init {
- var totalUsage = 0.0
- var minExit = Long.MAX_VALUE
- var maxExit = 0L
- var nonEmpty = false
-
- // Determine the duration of the first/last CPU to finish
- for (i in 0 until min(model.cpus.size, slice.burst.size)) {
- val cpu = model.cpus[i]
- val usage = min(slice.limit[i], cpu.frequency)
- val cpuDuration = ceil(slice.burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds
-
- totalUsage += usage / cpu.frequency
-
- if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst
- minExit = min(minExit, cpuDuration)
- maxExit = max(maxExit, cpuDuration)
- nonEmpty = true
- }
- }
-
- this.isEmpty = !nonEmpty
- this.totalUsage = totalUsage
- this.minExit = minExit
- this.maxExit = maxExit
- }
-
- /**
- * Indicate that the work on the slice has started.
- */
- fun start() {
- usageState.value = totalUsage / model.cpus.size
- }
-
- /**
- * Flush the work performed on the slice.
- */
- fun stop(duration: Long): Boolean {
- var hasFinished = true
-
- for (i in 0 until min(model.cpus.size, slice.burst.size)) {
- val usage = min(slice.limit[i], model.cpus[i].frequency)
- val granted = ceil(duration / 1000.0 * usage).toLong()
- val res = max(0, slice.burst[i] - granted)
- slice.burst[i] = res
-
- if (res != 0L) {
- hasFinished = false
- }
- }
-
- return hasFinished
- }
- }
+ public suspend fun run(workload: SimWorkload)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
index 0ef4130e..2add8cce 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
@@ -26,6 +26,9 @@ import org.opendc.simulator.compute.SimExecutionContext
/**
* A model that characterizes the runtime behavior of some particular workload.
+ *
+ * Workloads are stateful objects that may be paused and resumed at a later moment. As such, be careful when using the
+ * same [SimWorkload] from multiple contexts as only a single concurrent [run] call is expected.
*/
public interface SimWorkload {
/**
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
new file mode 100644
index 00000000..b9cd1b06
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import java.time.Clock
+import java.util.*
+
+/**
+ * Test suite for the [SimHypervisor] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimHypervisorTest {
+ private lateinit var scope: TestCoroutineScope
+ private lateinit var clock: Clock
+ private lateinit var machineModel: SimMachineModel
+
+ @BeforeEach
+ fun setUp() {
+ scope = TestCoroutineScope()
+ clock = DelayControllerClockAdapter(scope)
+
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+ machineModel = SimMachineModel(
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ )
+ }
+
+ /**
+ * Test overcommissioning of a hypervisor.
+ */
+ @Test
+ fun overcommission() {
+ val listener = object : SimHypervisor.Listener {
+ var totalRequestedBurst = 0L
+ var totalGrantedBurst = 0L
+ var totalOvercommissionedBurst = 0L
+
+ override fun onSliceFinish(
+ hypervisor: SimHypervisor,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ totalRequestedBurst += requestedBurst
+ totalGrantedBurst += grantedBurst
+ totalOvercommissionedBurst += overcommissionedBurst
+ }
+ }
+
+ scope.launch {
+ val duration = 5 * 60L
+ val workloadA =
+ SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(0, 28L * duration, duration * 1000, 28.0, 2),
+ SimTraceWorkload.Fragment(0, 3500L * duration, duration * 1000, 3500.0, 2),
+ SimTraceWorkload.Fragment(0, 0, duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(0, 183L * duration, duration * 1000, 183.0, 2)
+ ),
+ )
+ val workloadB =
+ SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(0, 28L * duration, duration * 1000, 28.0, 2),
+ SimTraceWorkload.Fragment(0, 3100L * duration, duration * 1000, 3100.0, 2),
+ SimTraceWorkload.Fragment(0, 0, duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(0, 73L * duration, duration * 1000, 73.0, 2)
+ )
+ )
+
+ val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val hypervisor = SimHypervisor(scope, clock, listener)
+
+ launch {
+ machine.run(hypervisor)
+ }
+
+ yield()
+ launch { hypervisor.createMachine(machineModel).run(workloadA) }
+ launch { hypervisor.createMachine(machineModel).run(workloadB) }
+ }
+
+ scope.advanceUntilIdle()
+
+ assertAll(
+ { Assertions.assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
+ { Assertions.assertEquals(2073600, listener.totalRequestedBurst, "Requested Burst does not match") },
+ { Assertions.assertEquals(2013600, listener.totalGrantedBurst, "Granted Burst does not match") },
+ { Assertions.assertEquals(60000, listener.totalOvercommissionedBurst, "Overcommissioned Burst does not match") },
+ { Assertions.assertEquals(1200001, scope.currentTime) }
+ )
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index f6fb8e04..332ca8e9 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -35,7 +35,7 @@ import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
/**
- * Test suite for the [SimMachine] class.
+ * Test suite for the [SimBareMetalMachine] class.
*/
@OptIn(ExperimentalCoroutinesApi::class)
class SimMachineTest {
@@ -55,7 +55,7 @@ class SimMachineTest {
fun testFlopsWorkload() {
val testScope = TestCoroutineScope()
val clock = DelayControllerClockAdapter(testScope)
- val machine = SimMachine(testScope, clock, machineModel)
+ val machine = SimBareMetalMachine(testScope, clock, machineModel)
testScope.runBlockingTest {
machine.run(SimFlopsWorkload(2_000, 2, utilization = 1.0))
@@ -69,7 +69,7 @@ class SimMachineTest {
fun testUsage() {
val testScope = TestCoroutineScope()
val clock = DelayControllerClockAdapter(testScope)
- val machine = SimMachine(testScope, clock, machineModel)
+ val machine = SimBareMetalMachine(testScope, clock, machineModel)
testScope.runBlockingTest {
machine.run(SimFlopsWorkload(2_000, 2, utilization = 1.0))