summaryrefslogtreecommitdiff
path: root/opendc/opendc-compute/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc/opendc-compute/src')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt22
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt130
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt196
3 files changed, 232 insertions, 116 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index e0a491c8..663fa5e4 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -28,6 +28,8 @@ import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.services.ServiceKey
+import kotlinx.coroutines.selects.SelectClause0
+import kotlinx.coroutines.selects.select
/**
* Represents the execution context in which a bootable [Image] runs on a [Server].
@@ -62,5 +64,23 @@ public interface ServerContext {
* @param limit The maximum usage in terms of MHz that the processing core may use while running the burst.
* @param deadline The instant at which this request needs to be fulfilled.
*/
- public suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long)
+ public suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
+ select<Unit> { onRun(burst, limit, deadline).invoke {} }
+ }
+
+ /**
+ * Request the specified burst time from the processor cores and suspend execution until a processor core finishes
+ * processing a **non-zero** burst or until the deadline is reached.
+ *
+ * After the method returns, [burst] will contain the remaining burst length for each of the cores (which may be
+ * zero).
+ *
+ * Both [burst] and [limit] must be of the same size and in any other case the method will throw an
+ * [IllegalArgumentException].
+ *
+ * @param burst The burst time to request from each of the processor cores.
+ * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst.
+ * @param deadline The instant at which this request needs to be fulfilled.
+ */
+ public fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 37ae9eb5..e3cb6e35 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -25,9 +25,9 @@
package com.atlarge.opendc.compute.metal.driver
import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.flow.StateFlow
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
@@ -36,7 +36,6 @@ import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.execution.ShutdownException
-import com.atlarge.opendc.compute.core.execution.assertFailure
import com.atlarge.opendc.compute.core.image.EmptyImage
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.metal.Node
@@ -46,18 +45,23 @@ import com.atlarge.opendc.compute.metal.power.ConstantPowerModel
import com.atlarge.opendc.core.power.PowerModel
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Job
+import kotlinx.coroutines.Delay
+import kotlinx.coroutines.DisposableHandle
+import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.launch
+import kotlinx.coroutines.selects.SelectClause0
+import kotlinx.coroutines.selects.SelectInstance
import java.util.UUID
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
import kotlinx.coroutines.withContext
import java.lang.Exception
+import kotlin.coroutines.ContinuationInterceptor
/**
* A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
@@ -242,64 +246,78 @@ public class SimpleBareMetalDriver(
setNode(nodeState.value.copy(state = newNodeState, server = server))
}
- private var flush: Job? = null
+ private var flush: DisposableHandle? = null
- override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
+ @OptIn(InternalCoroutinesApi::class)
+ override fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 {
require(burst.size == limit.size) { "Array dimensions do not match" }
assert(!finalized) { "Server instance is already finalized" }
- // If run is called in at the same timestamp as the previous call, cancel the load flush
- flush?.cancel()
- flush = null
-
- val start = simulationContext.clock.millis()
- var duration = max(0, deadline - start)
- var totalUsage = 0.0
-
- // Determine the duration of the first CPU to finish
- for (i in 0 until min(cpus.size, burst.size)) {
- val cpu = cpus[i]
- val usage = min(limit[i], cpu.frequency)
- val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds
-
- totalUsage += usage / cpu.frequency
-
- if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst
- duration = min(duration, cpuDuration)
+ return object : SelectClause0 {
+ @InternalCoroutinesApi
+ override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
+ // If run is called in at the same timestamp as the previous call, cancel the load flush
+ flush?.dispose()
+ flush = null
+
+ val context = select.completion.context
+ val simulationContext = context[SimulationContext]!!
+ val delay = context[ContinuationInterceptor] as Delay
+
+ val start = simulationContext.clock.millis()
+ var duration = max(0, deadline - start)
+ var totalUsage = 0.0
+
+ // Determine the duration of the first CPU to finish
+ for (i in 0 until min(cpus.size, burst.size)) {
+ val cpu = cpus[i]
+ val usage = min(limit[i], cpu.frequency)
+ val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds
+
+ totalUsage += usage / cpu.frequency
+
+ if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst
+ duration = min(duration, cpuDuration)
+ }
+ }
+
+ if (!unavailable) {
+ usageState.value = totalUsage / cpus.size
+ }
+
+ val action = Runnable {
+ // todo: we could have replaced startCoroutine with startCoroutineUndispatched
+ // But we need a way to know that Delay.invokeOnTimeout had used the right thread
+ if (select.trySelect()) {
+ block.startCoroutineCancellable(select.completion) // shall be cancellable while waits for dispatch
+ }
+ }
+
+ val disposable = delay.invokeOnTimeout(duration, action)
+ val flush = DisposableHandle {
+ val end = simulationContext.clock.millis()
+
+ // Flush the load if they do not receive a new run call for the same timestamp
+ flush = delay.invokeOnTimeout(1, Runnable {
+ usageState.value = 0.0
+ flush = null
+ })
+
+ if (!unavailable) {
+ // Write back the remaining burst time
+ for (i in 0 until min(cpus.size, burst.size)) {
+ val usage = min(limit[i], cpus[i].frequency)
+ val granted = ceil((end - start) / 1000.0 * usage).toLong()
+ burst[i] = max(0, burst[i] - granted)
+ }
+ }
+
+ disposable.dispose()
+ }
+
+ select.disposeOnSelect(flush)
}
}
-
- if (!unavailable) {
- usageState.value = totalUsage / cpus.size
- }
-
- try {
- delay(duration)
- } catch (e: CancellationException) {
- // On non-failure cancellation, we compute and return the remaining burst
- e.assertFailure()
- }
- val end = simulationContext.clock.millis()
-
- // Flush the load if they do not receive a new run call for the same timestamp
- flush = domain.launch(job) {
- delay(1)
- usageState.value = 0.0
- }
- flush!!.invokeOnCompletion {
- flush = null
- }
-
- if (unavailable) {
- return
- }
-
- // Write back the remaining burst time
- for (i in 0 until min(cpus.size, burst.size)) {
- val usage = min(limit[i], cpus[i].frequency)
- val granted = ceil((end - start) / 1000.0 * usage).toLong()
- burst[i] = max(0, burst[i] - granted)
- }
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index c21a9fc0..4939a624 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -46,7 +46,9 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Job
+import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
@@ -54,6 +56,12 @@ import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
+import kotlinx.coroutines.selects.SelectClause0
+import kotlinx.coroutines.selects.select
+import kotlinx.coroutines.withContext
+import java.lang.Exception
+import java.util.Objects
+import java.util.TreeSet
import java.util.UUID
import kotlin.math.ceil
import kotlin.math.max
@@ -99,6 +107,17 @@ class SimpleVirtDriver(
performanceModel?.computeIntersectingItems(imagesRunning)
}
}.launchIn(this)
+
+ launch {
+ try {
+ scheduler()
+ } catch (e: Exception) {
+ if (e !is CancellationException) {
+ simulationContext.log.error("Hypervisor scheduler failed", e)
+ }
+ throw e
+ }
+ }
}
override suspend fun spawn(
@@ -128,44 +147,75 @@ class SimpleVirtDriver(
}
/**
- * A flag to indicate the driver is stopped.
+ * A scheduling command processed by the scheduler.
*/
- private var stopped: Boolean = false
+ private sealed class SchedulerCommand {
+ /**
+ * Schedule the specified vCPUs of a single VM.
+ */
+ data class Schedule(val vm: VmServerContext, val requests: Collection<CpuRequest>) : SchedulerCommand()
+
+ /**
+ * Interrupt the scheduler.
+ */
+ object Interrupt : SchedulerCommand()
+ }
/**
- * The set of [VmServerContext] instances that is being scheduled at the moment.
+ * A flag to indicate the driver is stopped.
*/
- private val activeVms = mutableSetOf<VmServerContext>()
+ private var stopped: Boolean = false
/**
- * The deferred run call.
+ * The channel for scheduling new CPU requests.
*/
- private var call: Job? = null
+ private val schedulingQueue = Channel<SchedulerCommand>(Channel.UNLIMITED)
/**
- * Schedule the vCPUs on the physical CPUs.
+ * The scheduling process of the hypervisor.
*/
- private fun reschedule() {
- flush()
+ private suspend fun scheduler() {
+ val clock = simulationContext.clock
+ val maxUsage = hostContext.cpus.sumByDouble { it.frequency }
+ val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }
+
+ val vms = mutableMapOf<VmServerContext, Collection<CpuRequest>>()
+ val requests = TreeSet<CpuRequest>()
+
+ val usage = DoubleArray(hostContext.cpus.size)
+ val burst = LongArray(hostContext.cpus.size)
+
+ fun process(command: SchedulerCommand) {
+ when (command) {
+ is SchedulerCommand.Schedule -> {
+ vms[command.vm] = command.requests
+ requests.removeAll(command.requests)
+ requests.addAll(command.requests)
+ }
+ }
+ }
- // Do not schedule a call if there is no work to schedule or the driver stopped.
- if (stopped || activeVms.isEmpty()) {
- return
+ fun processRemaining() {
+ var command = schedulingQueue.poll()
+ while (command != null) {
+ process(command)
+ command = schedulingQueue.poll()
+ }
}
- val call = launch {
- val start = simulationContext.clock.millis()
- val vms = activeVms.toSet()
+ while (!stopped) {
+ // Wait for a request to be submitted if we have no work yet.
+ if (requests.isEmpty()) {
+ process(schedulingQueue.receive())
+ }
+
+ processRemaining()
+
+ val start = clock.millis()
var duration: Double = Double.POSITIVE_INFINITY
var deadline: Long = Long.MAX_VALUE
-
- val maxUsage = hostContext.cpus.sumByDouble { it.frequency }
var availableUsage = maxUsage
- val requests = vms.asSequence()
- .flatMap { it.requests.asSequence() }
- .sortedBy { it.limit }
- .toList()
// Divide the available host capacity fairly across the vCPUs using max-min fair sharing
for ((i, req) in requests.withIndex()) {
@@ -177,48 +227,55 @@ class SimpleVirtDriver(
availableUsage -= grantedUsage
// The duration that we want to run is that of the shortest request from a vCPU
- duration = min(duration, req.burst / req.allocatedUsage)
+ duration = min(duration, req.burst / grantedUsage)
deadline = min(deadline, req.vm.deadline)
}
- val usage = DoubleArray(hostContext.cpus.size)
- val burst = LongArray(hostContext.cpus.size)
val totalUsage = maxUsage - availableUsage
+ var totalBurst = 0L
availableUsage = totalUsage
val serverLoad = totalUsage / maxUsage
// Divide the requests over the available capacity of the pCPUs fairly
- for (i in hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }) {
+ for (i in pCPUs) {
val remaining = hostContext.cpus.size - i
val availableShare = availableUsage / remaining
val grantedUsage = min(hostContext.cpus[i].frequency, availableShare)
+ val pBurst = (duration * grantedUsage).toLong()
usage[i] = grantedUsage
- burst[i] = (duration * grantedUsage).toLong()
+ burst[i] = pBurst
+ totalBurst += pBurst
availableUsage -= grantedUsage
}
- val remainder = burst.clone()
// We run the total burst on the host processor. Note that this call may be cancelled at any moment in
// time, so not all of the burst may be executed.
- hostContext.run(remainder, usage, deadline)
- val end = simulationContext.clock.millis()
+ val interrupted = select<Boolean> {
+ schedulingQueue.onReceive { schedulingQueue.offer(it); true }
+ hostContext.onRun(burst, usage, deadline).invoke { false }
+ }
+
+ val end = clock.millis()
// No work was performed
if ((end - start) <= 0) {
- return@launch
+ continue
}
- val totalRemainder = remainder.sum()
- val totalBurst = burst.sum()
+ val totalRemainder = burst.sum()
+
+ val entryIterator = vms.entries.iterator()
+ while (entryIterator.hasNext()) {
+ val (vm, vmRequests) = entryIterator.next()
- for (vm in vms) {
// Apply performance interference model
val performanceModel =
vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
val performanceScore = performanceModel?.apply(serverLoad) ?: 1.0
+ var hasFinished = false
- for ((i, req) in vm.requests.withIndex()) {
+ for ((i, req) in vmRequests.withIndex()) {
// Compute the fraction of compute time allocated to the VM
val fraction = req.allocatedUsage / totalUsage
@@ -231,9 +288,17 @@ class SimpleVirtDriver(
// Compute remaining burst time to be executed for the request
req.burst = max(0, vm.burst[i] - grantedBurst)
vm.burst[i] = req.burst
+
+ if (req.burst <= 0L || req.isCancelled) {
+ hasFinished = true
+ }
}
- if (vm.burst.any { it == 0L } || vm.deadline <= end) {
+ if (hasFinished || vm.deadline <= end) {
+ // Deschedule all requests from this VM
+ entryIterator.remove()
+ requests.removeAll(vmRequests)
+
// Return vCPU `run` call: the requested burst was completed or deadline was exceeded
vm.chan.send(Unit)
}
@@ -248,22 +313,7 @@ class SimpleVirtDriver(
server
)
)
-
- // Make sure we reschedule the remaining amount of work (if we did not obtain the entire request)
- reschedule()
}
- this.call = call
- }
-
- /**
- * Flush the progress of the current active VMs.
- */
- private fun flush() {
- val call = call ?: return // If there is no active call, there is nothing to flush
- // The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its
- // completion.
- call.cancel()
- this.call = null
}
/**
@@ -274,11 +324,35 @@ class SimpleVirtDriver(
val vcpu: ProcessingUnit,
var burst: Long,
val limit: Double
- ) {
+ ) : Comparable<CpuRequest> {
/**
* The usage that was actually granted.
*/
var allocatedUsage: Double = 0.0
+
+ /**
+ * A flag to indicate the request was cancelled.
+ */
+ var isCancelled: Boolean = false
+
+ override fun equals(other: Any?): Boolean = other is CpuRequest && vm == other.vm && vcpu == other.vcpu
+ override fun hashCode(): Int = Objects.hash(vm, vcpu)
+
+ override fun compareTo(other: CpuRequest): Int {
+ var cmp = limit.compareTo(other.limit)
+
+ if (cmp != 0) {
+ return cmp
+ }
+
+ cmp = vm.server.uid.compareTo(other.vm.server.uid)
+
+ if (cmp != 0) {
+ return cmp
+ }
+
+ return vcpu.id.compareTo(other.vcpu.id)
+ }
}
internal inner class VmServerContext(
@@ -287,7 +361,6 @@ class SimpleVirtDriver(
val domain: Domain
) : ServerManagementContext {
private var finalized: Boolean = false
- lateinit var requests: List<CpuRequest>
lateinit var burst: LongArray
var deadline: Long = 0L
var chan = Channel<Unit>(Channel.RENDEZVOUS)
@@ -347,7 +420,7 @@ class SimpleVirtDriver(
this.deadline = deadline
this.burst = burst
- requests = cpus.asSequence()
+ val requests = cpus.asSequence()
.take(burst.size)
.mapIndexed { i, cpu ->
CpuRequest(
@@ -361,16 +434,21 @@ class SimpleVirtDriver(
// Wait until the burst has been run or the coroutine is cancelled
try {
- activeVms += this
- reschedule()
+ schedulingQueue.send(SchedulerCommand.Schedule(this, requests))
chan.receive()
} catch (e: CancellationException) {
- // On cancellation, we compute and return the remaining burst
+ // Deschedule the VM
+ withContext(NonCancellable) {
+ requests.forEach { it.isCancelled = true }
+ schedulingQueue.send(SchedulerCommand.Interrupt)
+ chan.receive()
+ }
+
e.assertFailure()
- } finally {
- activeVms -= this
- reschedule()
}
}
+
+ @OptIn(InternalCoroutinesApi::class)
+ override fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 = TODO()
}
}