summaryrefslogtreecommitdiff
path: root/simulator/opendc-simulator/opendc-simulator-compute/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-01-07 23:59:15 +0100
committerGitHub <noreply@github.com>2021-01-07 23:59:15 +0100
commit42e9a5b5b610f41a03e68f6fc781c54b9402925b (patch)
tree650e886239e8983812d14f3108fdc895756a17d0 /simulator/opendc-simulator/opendc-simulator-compute/src/main
parent3441586e4a10fcc6b2f458d16301ae2770d4886a (diff)
parent9cf24c9a8d3e96a29d9b111081bc3369aadd490d (diff)
Merge pull request #69 from atlarge-research/refactor/workflows-v1
Refactor workflow service to schedule tasks onto VMs
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-compute/src/main')
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt517
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt489
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt4
4 files changed, 528 insertions, 484 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 4340708f..5e50a676 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -251,7 +251,7 @@ public class SimBareMetalMachine(
this.isEmpty = !nonEmpty
this.totalUsage = totalUsage
- this.minExit = minExit
+ this.minExit = if (isEmpty) 0 else minExit
this.maxExit = maxExit
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt
new file mode 100644
index 00000000..b88871a5
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt
@@ -0,0 +1,517 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.StateFlow
+import kotlinx.coroutines.intrinsics.startCoroutineCancellable
+import kotlinx.coroutines.selects.SelectClause0
+import kotlinx.coroutines.selects.SelectInstance
+import kotlinx.coroutines.selects.select
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.time.Clock
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single
+ * [SimBareMetalMachine] concurrently using weighted fair sharing.
+ *
+ * @param coroutineScope The [CoroutineScope] to run the simulated workloads in.
+ * @param clock The virtual clock to track the simulation time.
+ */
+@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
+public class SimFairSharedHypervisor(
+ private val coroutineScope: CoroutineScope,
+ private val clock: Clock,
+ private val listener: SimHypervisor.Listener? = null
+) : SimHypervisor {
+ /**
+ * A flag to indicate the driver is stopped.
+ */
+ private var stopped: Boolean = false
+
+ /**
+ * The channel for scheduling new CPU requests.
+ */
+ private val schedulingQueue = Channel<SchedulerCommand>(Channel.UNLIMITED)
+
+ /**
+ * Create a [SimMachine] instance on which users may run a [SimWorkload].
+ *
+ * @param model The machine to create.
+ */
+ override fun createMachine(model: SimMachineModel, performanceInterferenceModel: PerformanceInterferenceModel?): SimMachine {
+ val vm = VmSession(model, performanceInterferenceModel)
+ val vmCtx = VmExecutionContext(vm)
+
+ return object : SimMachine {
+ override val model: SimMachineModel
+ get() = vmCtx.machine
+
+ override val usage: StateFlow<Double>
+ get() = vm.usage
+
+ /**
+ * The current active workload.
+ */
+ private var activeWorkload: SimWorkload? = null
+
+ override suspend fun run(workload: SimWorkload) {
+ require(activeWorkload == null) { "Run should not be called concurrently" }
+
+ try {
+ activeWorkload = workload
+ workload.run(vmCtx)
+ } finally {
+ activeWorkload = null
+ }
+ }
+
+ override fun toString(): String = "SimVirtualMachine"
+ }
+ }
+
+ /**
+ * Run the scheduling process of the hypervisor.
+ */
+ override suspend fun run(ctx: SimExecutionContext) {
+ val model = ctx.machine
+ val maxUsage = model.cpus.sumByDouble { it.frequency }
+ val pCPUs = model.cpus.indices.sortedBy { model.cpus[it].frequency }
+
+ val vms = mutableSetOf<VmSession>()
+ val vcpus = mutableListOf<VCpu>()
+
+ val usage = DoubleArray(model.cpus.size)
+ val burst = LongArray(model.cpus.size)
+
+ fun process(command: SchedulerCommand) {
+ when (command) {
+ is SchedulerCommand.Schedule -> {
+ vms += command.vm
+ vcpus.addAll(command.vm.vcpus)
+ }
+ is SchedulerCommand.Deschedule -> {
+ vms -= command.vm
+ vcpus.removeAll(command.vm.vcpus)
+ }
+ is SchedulerCommand.Interrupt -> {
+ }
+ }
+ }
+
+ fun processRemaining() {
+ var command = schedulingQueue.poll()
+ while (command != null) {
+ process(command)
+ command = schedulingQueue.poll()
+ }
+ }
+
+ while (!stopped) {
+ // Wait for a request to be submitted if we have no work yet.
+ if (vcpus.isEmpty()) {
+ process(schedulingQueue.receive())
+ }
+
+ processRemaining()
+
+ val start = clock.millis()
+
+ var duration: Double = Double.POSITIVE_INFINITY
+ var deadline: Long = Long.MAX_VALUE
+ var availableUsage = maxUsage
+ var totalRequestedUsage = 0.0
+ var totalRequestedBurst = 0L
+
+ // Sort the vCPUs based on their requested usage
+ // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
+ vcpus.sort()
+
+ // Divide the available host capacity fairly across the vCPUs using max-min fair sharing
+ for ((i, req) in vcpus.withIndex()) {
+ val remaining = vcpus.size - i
+ val availableShare = availableUsage / remaining
+ val grantedUsage = min(req.limit, availableShare)
+
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, req.vm.deadline)
+
+ // Ignore empty CPUs
+ if (grantedUsage <= 0 || req.burst <= 0) {
+ req.allocatedLimit = 0.0
+ continue
+ }
+
+ totalRequestedUsage += req.limit
+ totalRequestedBurst += req.burst
+
+ req.allocatedLimit = grantedUsage
+ availableUsage -= grantedUsage
+
+ // The duration that we want to run is that of the shortest request from a vCPU
+ duration = min(duration, req.burst / grantedUsage)
+ }
+
+ val totalAllocatedUsage = maxUsage - availableUsage
+ var totalAllocatedBurst = 0L
+ availableUsage = totalAllocatedUsage
+ val serverLoad = totalAllocatedUsage / maxUsage
+
+ // / XXX Ceil duration to eliminate rounding issues
+ duration = ceil(duration)
+
+ // Divide the requests over the available capacity of the pCPUs fairly
+ for (i in pCPUs) {
+ val maxCpuUsage = model.cpus[i].frequency
+ val fraction = maxCpuUsage / maxUsage
+ val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction)
+ val grantedBurst = ceil(duration * grantedUsage).toLong()
+
+ usage[i] = grantedUsage
+ burst[i] = grantedBurst
+ totalAllocatedBurst += grantedBurst
+ availableUsage -= grantedUsage
+ }
+
+ // XXX If none of the VMs require any computation, wait until their deadline, otherwise trigger on the
+ // first vCPU finished.
+ val triggerMode =
+ if (totalAllocatedBurst > 0 && totalAllocatedUsage > 0.0)
+ SimExecutionContext.TriggerMode.FIRST
+ else
+ SimExecutionContext.TriggerMode.DEADLINE
+
+ // We run the total burst on the host processor. Note that this call may be cancelled at any moment in
+ // time, so not all of the burst may be executed.
+ val isInterrupted = select<Boolean> {
+ schedulingQueue.onReceive { schedulingQueue.offer(it); true }
+ ctx.onRun(SimExecutionContext.Slice(burst, usage, deadline), triggerMode)
+ .invoke { false }
+ }
+
+ val end = clock.millis()
+
+ // The total requested burst that the VMs wanted to run in the time-frame that we ran.
+ val totalRequestedSubBurst =
+ vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum()
+ val totalRemainder = burst.sum()
+ val totalGrantedBurst = totalAllocatedBurst - totalRemainder
+
+ // The burst that was lost due to overcommissioning of CPU resources
+ var totalOvercommissionedBurst = 0L
+ // The burst that was lost due to interference.
+ var totalInterferedBurst = 0L
+
+ val vmIterator = vms.iterator()
+ while (vmIterator.hasNext()) {
+ val vm = vmIterator.next()
+
+ // Apply performance interference model
+ val performanceScore = vm.performanceInterferenceModel?.apply(serverLoad) ?: 1.0
+ var hasFinished = false
+
+ for (vcpu in vm.vcpus) {
+ // Compute the fraction of compute time allocated to the VM
+ val fraction = vcpu.allocatedLimit / totalAllocatedUsage
+
+ // Compute the burst time that the VM was actually granted
+ val grantedBurst = ceil(totalGrantedBurst * fraction).toLong()
+
+ // The burst that was actually used by the VM
+ val usedBurst = ceil(grantedBurst * performanceScore).toLong()
+
+ totalInterferedBurst += grantedBurst - usedBurst
+
+ // Compute remaining burst time to be executed for the request
+ if (vcpu.consume(usedBurst)) {
+ hasFinished = true
+ } else if (vm.deadline <= end) {
+ // Request must have its entire burst consumed or otherwise we have overcommission
+ // Note that we count the overcommissioned burst if the hypervisor has failed.
+ totalOvercommissionedBurst += vcpu.burst
+ }
+ }
+
+ if (hasFinished || vm.deadline <= end) {
+ // Mark the VM as finished and deschedule the VMs if needed
+ if (vm.finish()) {
+ vmIterator.remove()
+ vcpus.removeAll(vm.vcpus)
+ }
+ }
+ }
+
+ listener?.onSliceFinish(
+ this,
+ totalRequestedBurst,
+ min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing
+ totalOvercommissionedBurst,
+ totalInterferedBurst, // Might be smaller than zero due to FP rounding errors,
+ min(
+ totalAllocatedUsage,
+ totalRequestedUsage
+ ), // The allocated usage might be slightly higher due to FP rounding
+ totalRequestedUsage
+ )
+ }
+ }
+
+ /**
+ * A scheduling command processed by the scheduler.
+ */
+ private sealed class SchedulerCommand {
+ /**
+ * Schedule the specified VM on the hypervisor.
+ */
+ data class Schedule(val vm: VmSession) : SchedulerCommand()
+
+ /**
+ * De-schedule the specified VM on the hypervisor.
+ */
+ data class Deschedule(val vm: VmSession) : SchedulerCommand()
+
+ /**
+ * Interrupt the scheduler.
+ */
+ object Interrupt : SchedulerCommand()
+ }
+
+ /**
+ * A virtual machine running on the hypervisor.
+ *
+ * @param ctx The execution context the vCPU runs in.
+ * @param triggerMode The mode when to trigger the VM exit.
+ * @param merge The function to merge consecutive slices on spillover.
+ * @param select The function to select on finish.
+ */
+ @OptIn(InternalCoroutinesApi::class)
+ private data class VmSession(
+ val model: SimMachineModel,
+ val performanceInterferenceModel: PerformanceInterferenceModel? = null,
+ var triggerMode: SimExecutionContext.TriggerMode = SimExecutionContext.TriggerMode.FIRST,
+ var merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice = { _, r -> r },
+ var select: () -> Unit = {}
+ ) {
+ /**
+ * The vCPUs of this virtual machine.
+ */
+ val vcpus: List<VCpu>
+
+ /**
+ * The slices that the VM wants to run.
+ */
+ var queue: Iterator<SimExecutionContext.Slice> = emptyList<SimExecutionContext.Slice>().iterator()
+
+ /**
+ * The current active slice.
+ */
+ var activeSlice: SimExecutionContext.Slice? = null
+
+ /**
+ * The current deadline of the VM.
+ */
+ val deadline: Long
+ get() = activeSlice?.deadline ?: Long.MAX_VALUE
+
+ /**
+ * A flag to indicate that the VM is idle.
+ */
+ val isIdle: Boolean
+ get() = activeSlice == null
+
+ /**
+ * The usage of the virtual machine.
+ */
+ val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
+
+ init {
+ vcpus = model.cpus.mapIndexed { i, model -> VCpu(this, model, i) }
+ }
+
+ /**
+ * Schedule the given slices on this vCPU, replacing the existing slices.
+ */
+ fun schedule(slices: Sequence<SimExecutionContext.Slice>) {
+ queue = slices.iterator()
+
+ if (queue.hasNext()) {
+ activeSlice = queue.next()
+ refresh()
+ }
+ }
+
+ /**
+ * Cancel the existing workload on the VM.
+ */
+ fun cancel() {
+ queue = emptyList<SimExecutionContext.Slice>().iterator()
+ activeSlice = null
+ refresh()
+ }
+
+ /**
+ * Finish the current slice of the VM.
+ *
+ * @return `true` if the vCPUs may be descheduled, `false` otherwise.
+ */
+ fun finish(): Boolean {
+ val activeSlice = activeSlice ?: return true
+
+ return if (queue.hasNext()) {
+ val needsMerge = activeSlice.burst.any { it > 0 }
+ val candidateSlice = queue.next()
+ val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice
+
+ this.activeSlice = slice
+
+ // Update the vCPU cache
+ refresh()
+
+ false
+ } else {
+ this.activeSlice = null
+ select()
+ true
+ }
+ }
+
+ /**
+ * Refresh the vCPU cache.
+ */
+ fun refresh() {
+ vcpus.forEach { it.refresh() }
+ usage.value = vcpus.sumByDouble { it.burst / it.limit } / vcpus.size
+ }
+ }
+
+ /**
+ * A virtual CPU that can be scheduled on a physical CPU.
+ *
+ * @param vm The VM of which this vCPU is part.
+ * @param model The model of CPU that this vCPU models.
+ * @param id The id of the vCPU with respect to the VM.
+ */
+ private data class VCpu(
+ val vm: VmSession,
+ val model: ProcessingUnit,
+ val id: Int
+ ) : Comparable<VCpu> {
+ /**
+ * The current limit on the vCPU.
+ */
+ var limit: Double = 0.0
+
+ /**
+ * The limit allocated by the hypervisor.
+ */
+ var allocatedLimit: Double = 0.0
+
+ /**
+ * The current burst running on the vCPU.
+ */
+ var burst: Long = 0L
+
+ /**
+ * Consume the specified burst on this vCPU.
+ */
+ fun consume(burst: Long): Boolean {
+ this.burst = max(0, this.burst - burst)
+
+ // Flush the result to the slice if it exists
+ vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst)
+
+ val actuallyExists = vm.activeSlice?.burst?.let { id < it.size } ?: false
+ return actuallyExists && this.burst == 0L
+ }
+
+ /**
+ * Refresh the information of this vCPU based on the current slice.
+ */
+ fun refresh() {
+ limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0
+ burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0
+ }
+
+ /**
+ * Compare to another vCPU based on the current load of the vCPU.
+ */
+ override fun compareTo(other: VCpu): Int {
+ return limit.compareTo(other.limit)
+ }
+
+ /**
+ * Create a string representation of the vCPU.
+ */
+ override fun toString(): String =
+ "vCPU(id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)"
+ }
+
+ /**
+ * The execution context in which a VM runs.
+ *
+ */
+ private inner class VmExecutionContext(val session: VmSession) :
+ SimExecutionContext, DisposableHandle {
+ override val machine: SimMachineModel
+ get() = session.model
+
+ override val clock: Clock
+ get() = this@SimFairSharedHypervisor.clock
+
+ @OptIn(InternalCoroutinesApi::class)
+ override fun onRun(
+ batch: Sequence<SimExecutionContext.Slice>,
+ triggerMode: SimExecutionContext.TriggerMode,
+ merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice
+ ): SelectClause0 = object : SelectClause0 {
+ @InternalCoroutinesApi
+ override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
+ session.triggerMode = triggerMode
+ session.merge = merge
+ session.select = {
+ if (select.trySelect()) {
+ block.startCoroutineCancellable(select.completion)
+ }
+ }
+ session.schedule(batch)
+ // Indicate to the hypervisor that the VM should be re-scheduled
+ schedulingQueue.offer(SchedulerCommand.Schedule(session))
+ select.disposeOnSelect(this@VmExecutionContext)
+ }
+ }
+
+ override fun dispose() {
+ if (!session.isIdle) {
+ session.cancel()
+ schedulingQueue.offer(SchedulerCommand.Deschedule(session))
+ }
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt
index 6087227b..fb4cd137 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt
@@ -22,267 +22,23 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
-import kotlinx.coroutines.intrinsics.startCoroutineCancellable
-import kotlinx.coroutines.selects.SelectClause0
-import kotlinx.coroutines.selects.SelectInstance
-import kotlinx.coroutines.selects.select
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
-import java.time.Clock
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
/**
- * SimHypervisor distributes the computing requirements of multiple [SimWorkload] on a single [SimBareMetalMachine] concurrently.
- *
- * @param coroutineScope The [CoroutineScope] to run the simulated workloads in.
- * @param clock The virtual clock to track the simulation time.
+ * SimHypervisor distributes the computing requirements of multiple [SimWorkload] on a single [SimBareMetalMachine] i
+ * concurrently.
*/
-@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
-public class SimHypervisor(
- private val coroutineScope: CoroutineScope,
- private val clock: Clock,
- private val listener: Listener? = null
-) : SimWorkload {
- /**
- * A set for tracking the VM context objects.
- */
- private val vms: MutableSet<VmExecutionContext> = mutableSetOf()
-
- /**
- * A flag to indicate the driver is stopped.
- */
- private var stopped: Boolean = false
-
- /**
- * The channel for scheduling new CPU requests.
- */
- private val schedulingQueue = Channel<SchedulerCommand>(Channel.UNLIMITED)
-
+public interface SimHypervisor : SimWorkload {
/**
* Create a [SimMachine] instance on which users may run a [SimWorkload].
*
* @param model The machine to create.
*/
- public fun createMachine(model: SimMachineModel, performanceInterferenceModel: PerformanceInterferenceModel? = null): SimMachine {
- val vm = VmSession(model, performanceInterferenceModel)
- val vmCtx = VmExecutionContext(vm)
-
- return object : SimMachine {
- override val model: SimMachineModel
- get() = vmCtx.machine
-
- override val usage: StateFlow<Double>
- get() = vm.usage
-
- /**
- * The current active workload.
- */
- private var activeWorkload: SimWorkload? = null
-
- override suspend fun run(workload: SimWorkload) {
- require(activeWorkload == null) { "Run should not be called concurrently" }
-
- try {
- activeWorkload = workload
- workload.run(vmCtx)
- } finally {
- activeWorkload = null
- }
- }
-
- override fun toString(): String = "SimVirtualMachine"
- }
- }
-
- /**
- * Run the scheduling process of the hypervisor.
- */
- override suspend fun run(ctx: SimExecutionContext) {
- val model = ctx.machine
- val maxUsage = model.cpus.sumByDouble { it.frequency }
- val pCPUs = model.cpus.indices.sortedBy { model.cpus[it].frequency }
-
- val vms = mutableSetOf<VmSession>()
- val vcpus = mutableListOf<VCpu>()
-
- val usage = DoubleArray(model.cpus.size)
- val burst = LongArray(model.cpus.size)
-
- fun process(command: SchedulerCommand) {
- when (command) {
- is SchedulerCommand.Schedule -> {
- vms += command.vm
- vcpus.addAll(command.vm.vcpus)
- }
- is SchedulerCommand.Deschedule -> {
- vms -= command.vm
- vcpus.removeAll(command.vm.vcpus)
- }
- is SchedulerCommand.Interrupt -> {
- }
- }
- }
-
- fun processRemaining() {
- var command = schedulingQueue.poll()
- while (command != null) {
- process(command)
- command = schedulingQueue.poll()
- }
- }
-
- while (!stopped) {
- // Wait for a request to be submitted if we have no work yet.
- if (vcpus.isEmpty()) {
- process(schedulingQueue.receive())
- }
-
- processRemaining()
-
- val start = clock.millis()
-
- var duration: Double = Double.POSITIVE_INFINITY
- var deadline: Long = Long.MAX_VALUE
- var availableUsage = maxUsage
- var totalRequestedUsage = 0.0
- var totalRequestedBurst = 0L
-
- // Sort the vCPUs based on their requested usage
- // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
- vcpus.sort()
-
- // Divide the available host capacity fairly across the vCPUs using max-min fair sharing
- for ((i, req) in vcpus.withIndex()) {
- val remaining = vcpus.size - i
- val availableShare = availableUsage / remaining
- val grantedUsage = min(req.limit, availableShare)
-
- // Take into account the minimum deadline of this slice before we possible continue
- deadline = min(deadline, req.vm.deadline)
-
- // Ignore empty CPUs
- if (grantedUsage <= 0 || req.burst <= 0) {
- req.allocatedLimit = 0.0
- continue
- }
-
- totalRequestedUsage += req.limit
- totalRequestedBurst += req.burst
-
- req.allocatedLimit = grantedUsage
- availableUsage -= grantedUsage
-
- // The duration that we want to run is that of the shortest request from a vCPU
- duration = min(duration, req.burst / grantedUsage)
- }
-
- // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs.
- duration = 300.0
-
- val totalAllocatedUsage = maxUsage - availableUsage
- var totalAllocatedBurst = 0L
- availableUsage = totalAllocatedUsage
- val serverLoad = totalAllocatedUsage / maxUsage
-
- // Divide the requests over the available capacity of the pCPUs fairly
- for (i in pCPUs) {
- val maxCpuUsage = model.cpus[i].frequency
- val fraction = maxCpuUsage / maxUsage
- val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction)
- val grantedBurst = ceil(duration * grantedUsage).toLong()
-
- usage[i] = grantedUsage
- burst[i] = grantedBurst
- totalAllocatedBurst += grantedBurst
- availableUsage -= grantedUsage
- }
-
- // We run the total burst on the host processor. Note that this call may be cancelled at any moment in
- // time, so not all of the burst may be executed.
- select<Boolean> {
- schedulingQueue.onReceive { schedulingQueue.offer(it); true }
- ctx.onRun(SimExecutionContext.Slice(burst, usage, deadline), SimExecutionContext.TriggerMode.DEADLINE)
- .invoke { false }
- }
-
- val end = clock.millis()
-
- // No work was performed
- if ((end - start) <= 0) {
- continue
- }
-
- // The total requested burst that the VMs wanted to run in the time-frame that we ran.
- val totalRequestedSubBurst =
- vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum()
- val totalRemainder = burst.sum()
- val totalGrantedBurst = totalAllocatedBurst - totalRemainder
-
- // The burst that was lost due to overcommissioning of CPU resources
- var totalOvercommissionedBurst = 0L
- // The burst that was lost due to interference.
- var totalInterferedBurst = 0L
-
- val vmIterator = vms.iterator()
- while (vmIterator.hasNext()) {
- val vm = vmIterator.next()
-
- // Apply performance interference model
- val performanceScore = vm.performanceInterferenceModel?.apply(serverLoad) ?: 1.0
- var hasFinished = false
-
- for (vcpu in vm.vcpus) {
- // Compute the fraction of compute time allocated to the VM
- val fraction = vcpu.allocatedLimit / totalAllocatedUsage
-
- // Compute the burst time that the VM was actually granted
- val grantedBurst = ceil(totalGrantedBurst * fraction).toLong()
-
- // The burst that was actually used by the VM
- val usedBurst = ceil(grantedBurst * performanceScore).toLong()
-
- totalInterferedBurst += grantedBurst - usedBurst
-
- // Compute remaining burst time to be executed for the request
- if (vcpu.consume(usedBurst)) {
- hasFinished = true
- } else if (vm.deadline <= end) {
- // Request must have its entire burst consumed or otherwise we have overcommission
- // Note that we count the overcommissioned burst if the hypervisor has failed.
- totalOvercommissionedBurst += vcpu.burst
- }
- }
-
- if (hasFinished || vm.deadline <= end) {
- // Mark the VM as finished and deschedule the VMs if needed
- if (vm.finish()) {
- vmIterator.remove()
- vcpus.removeAll(vm.vcpus)
- }
- }
- }
-
- listener?.onSliceFinish(
- this,
- totalRequestedBurst,
- min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing
- totalOvercommissionedBurst,
- totalInterferedBurst, // Might be smaller than zero due to FP rounding errors,
- min(
- totalAllocatedUsage,
- totalRequestedUsage
- ), // The allocated usage might be slightly higher due to FP rounding
- totalRequestedUsage
- )
- }
- }
+ public fun createMachine(
+ model: SimMachineModel,
+ performanceInterferenceModel: PerformanceInterferenceModel? = null
+ ): SimMachine
/**
* Event listener for hypervisor events.
@@ -301,235 +57,4 @@ public class SimHypervisor(
cpuDemand: Double
)
}
-
- /**
- * A scheduling command processed by the scheduler.
- */
- private sealed class SchedulerCommand {
- /**
- * Schedule the specified VM on the hypervisor.
- */
- data class Schedule(val vm: VmSession) : SchedulerCommand()
-
- /**
- * De-schedule the specified VM on the hypervisor.
- */
- data class Deschedule(val vm: VmSession) : SchedulerCommand()
-
- /**
- * Interrupt the scheduler.
- */
- object Interrupt : SchedulerCommand()
- }
-
- /**
- * A virtual machine running on the hypervisor.
- *
- * @param ctx The execution context the vCPU runs in.
- * @param triggerMode The mode when to trigger the VM exit.
- * @param merge The function to merge consecutive slices on spillover.
- * @param select The function to select on finish.
- */
- @OptIn(InternalCoroutinesApi::class)
- private data class VmSession(
- val model: SimMachineModel,
- val performanceInterferenceModel: PerformanceInterferenceModel? = null,
- var triggerMode: SimExecutionContext.TriggerMode = SimExecutionContext.TriggerMode.FIRST,
- var merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice = { _, r -> r },
- var select: () -> Unit = {}
- ) {
- /**
- * The vCPUs of this virtual machine.
- */
- val vcpus: List<VCpu>
-
- /**
- * The slices that the VM wants to run.
- */
- var queue: Iterator<SimExecutionContext.Slice> = emptyList<SimExecutionContext.Slice>().iterator()
-
- /**
- * The current active slice.
- */
- var activeSlice: SimExecutionContext.Slice? = null
-
- /**
- * The current deadline of the VM.
- */
- val deadline: Long
- get() = activeSlice?.deadline ?: Long.MAX_VALUE
-
- /**
- * A flag to indicate that the VM is idle.
- */
- val isIdle: Boolean
- get() = activeSlice == null
-
- /**
- * The usage of the virtual machine.
- */
- val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
-
- init {
- vcpus = model.cpus.mapIndexed { i, model -> VCpu(this, model, i) }
- }
-
- /**
- * Schedule the given slices on this vCPU, replacing the existing slices.
- */
- fun schedule(slices: Sequence<SimExecutionContext.Slice>) {
- queue = slices.iterator()
-
- if (queue.hasNext()) {
- activeSlice = queue.next()
- refresh()
- }
- }
-
- /**
- * Cancel the existing workload on the VM.
- */
- fun cancel() {
- queue = emptyList<SimExecutionContext.Slice>().iterator()
- activeSlice = null
- refresh()
- }
-
- /**
- * Finish the current slice of the VM.
- *
- * @return `true` if the vCPUs may be descheduled, `false` otherwise.
- */
- fun finish(): Boolean {
- val activeSlice = activeSlice ?: return true
-
- return if (queue.hasNext()) {
- val needsMerge = activeSlice.burst.any { it > 0 }
- val candidateSlice = queue.next()
- val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice
-
- this.activeSlice = slice
-
- // Update the vCPU cache
- refresh()
-
- false
- } else {
- this.activeSlice = null
- select()
- true
- }
- }
-
- /**
- * Refresh the vCPU cache.
- */
- fun refresh() {
- vcpus.forEach { it.refresh() }
- usage.value = vcpus.sumByDouble { it.burst / it.limit } / vcpus.size
- }
- }
-
- /**
- * A virtual CPU that can be scheduled on a physical CPU.
- *
- * @param vm The VM of which this vCPU is part.
- * @param model The model of CPU that this vCPU models.
- * @param id The id of the vCPU with respect to the VM.
- */
- private data class VCpu(
- val vm: VmSession,
- val model: ProcessingUnit,
- val id: Int
- ) : Comparable<VCpu> {
- /**
- * The current limit on the vCPU.
- */
- var limit: Double = 0.0
-
- /**
- * The limit allocated by the hypervisor.
- */
- var allocatedLimit: Double = 0.0
-
- /**
- * The current burst running on the vCPU.
- */
- var burst: Long = 0L
-
- /**
- * Consume the specified burst on this vCPU.
- */
- fun consume(burst: Long): Boolean {
- this.burst = max(0, this.burst - burst)
-
- // Flush the result to the slice if it exists
- vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst)
-
- return allocatedLimit > 0.0 && this.burst == 0L
- }
-
- /**
- * Refresh the information of this vCPU based on the current slice.
- */
- fun refresh() {
- limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0
- burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0
- }
-
- /**
- * Compare to another vCPU based on the current load of the vCPU.
- */
- override fun compareTo(other: VCpu): Int {
- return limit.compareTo(other.limit)
- }
-
- /**
- * Create a string representation of the vCPU.
- */
- override fun toString(): String =
- "vCPU(id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)"
- }
-
- /**
- * The execution context in which a VM runs.
- *
- */
- private inner class VmExecutionContext(val session: VmSession) :
- SimExecutionContext, DisposableHandle {
- override val machine: SimMachineModel
- get() = session.model
-
- override val clock: Clock
- get() = this@SimHypervisor.clock
-
- @OptIn(InternalCoroutinesApi::class)
- override fun onRun(
- batch: Sequence<SimExecutionContext.Slice>,
- triggerMode: SimExecutionContext.TriggerMode,
- merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice
- ): SelectClause0 = object : SelectClause0 {
- @InternalCoroutinesApi
- override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
- session.triggerMode = triggerMode
- session.merge = merge
- session.select = {
- if (select.trySelect()) {
- block.startCoroutineCancellable(select.completion)
- }
- }
- session.schedule(batch)
- // Indicate to the hypervisor that the VM should be re-scheduled
- schedulingQueue.offer(SchedulerCommand.Schedule(session))
- select.disposeOnSelect(this@VmExecutionContext)
- }
- }
-
- override fun dispose() {
- if (!session.isIdle) {
- session.cancel()
- schedulingQueue.offer(SchedulerCommand.Deschedule(session))
- }
- }
- }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index 918a78bd..0d2c9374 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -39,7 +39,7 @@ public class SimFlopsWorkload(
public val utilization: Double = 0.8
) : SimWorkload {
init {
- require(flops >= 0) { "Negative number of flops" }
+ require(flops >= 0) { "Negative number of FLOPs" }
require(cores > 0) { "Negative number of cores or no cores" }
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
@@ -54,4 +54,6 @@ public class SimFlopsWorkload(
ctx.run(SimExecutionContext.Slice(burst, maxUsage, Long.MAX_VALUE), triggerMode = SimExecutionContext.TriggerMode.LAST)
}
+
+ override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,cores=$cores,utilization=$utilization)"
}