summaryrefslogtreecommitdiff
path: root/opendc/opendc-compute/src/main
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-03-04 13:08:26 +0100
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-03-04 13:08:26 +0100
commitfa7455ac6aaa1e0c34a4218c32423d544373e795 (patch)
treefd3a2b12bf5b3841ded39930ad2d3b0c1336448b /opendc/opendc-compute/src/main
parentac6e6f7c611fa7d10fff5467c4a61af932e4c171 (diff)
parent5f5d54b6f1a96bc595f99f367bea54f1d852ec63 (diff)
Merge branch 'refactor/2.x-vm-improvements' into 'feat/2.x'
Report CPU usage per server instance Closes #51 See merge request opendc/opendc-simulator!34
Diffstat (limited to 'opendc/opendc-compute/src/main')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingNode.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt)24
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingUnit.kt18
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt48
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt21
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt17
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt21
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt69
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt151
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt199
10 files changed, 247 insertions, 323 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingNode.kt
index 7b00d99c..91f5dde9 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingNode.kt
@@ -22,17 +22,19 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.virt.driver.hypervisor
-
-import com.atlarge.opendc.compute.core.Flavor
-import com.atlarge.opendc.compute.core.execution.ProcessorContext
+package com.atlarge.opendc.compute.core
/**
- * A scheduler that assigns virtual CPUs to virtual machines and maps them to physical CPUs.
+ * A processing node/package/socket containing possibly several CPU cores.
+ *
+ * @property vendor The vendor string of the processor node.
+ * @property modelName The name of the processor node.
+ * @property arch The micro-architecture of the processor node.
+ * @property coreCount The number of logical CPUs in the processor node.
*/
-public interface VmScheduler {
- /**
- * Create the virtual CPUs for the specified [flavor].
- */
- fun createVirtualCpus(flavor: Flavor): List<ProcessorContext>
-}
+data class ProcessingNode(
+ val vendor: String,
+ val arch: String,
+ val modelName: String,
+ val coreCount: Int
+)
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingUnit.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingUnit.kt
index 945b7061..dbf6d824 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingUnit.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingUnit.kt
@@ -25,18 +25,14 @@
package com.atlarge.opendc.compute.core
/**
- * A processing unit of a compute resource, either virtual or physical.
+ * A single logical compute unit of processor node, either virtual or physical.
*
- * @property vendor The vendor string of the cpu.
- * @property modelName The name of the cpu model.
- * @property arch The architecture of the CPU.
- * @property clockRate The clock speed of the cpu in MHz.
- * @property cores The number of logical cores in the cpu.
+ * @property node The processing node containing the CPU core.
+ * @property id The identifier of the CPU core within the processing node.
+ * @property frequency The clock rate of the CPU.
*/
public data class ProcessingUnit(
- public val vendor: String,
- public val modelName: String,
- public val arch: String,
- public val clockRate: Double,
- public val cores: Int
+ public val node: ProcessingNode,
+ public val id: Int,
+ public val frequency: Double
)
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt
deleted file mode 100644
index c081acd5..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.compute.core.execution
-
-import com.atlarge.opendc.compute.core.ProcessingUnit
-
-/**
- * An interface for managing a single processing core (CPU) of a (virtual) machine.
- */
-public interface ProcessorContext {
- /**
- * The information about the processing unit.
- */
- public val info: ProcessingUnit
-
- /**
- * Request the specified burst time from the processor and suspend execution until the processor finishes
- * processing of the requested burst.
- *
- * @param burst The burst time to request from the processor.
- * @param maxUsage The maximum usage in terms of MHz that the processing core may use while running the burst.
- * @param deadline The instant at which this request needs to be fulfilled.
- * @return The remaining burst time in case the method was cancelled or zero if the processor finished running.
- */
- public suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long
-}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index 53b00aa6..b09a5a7d 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.compute.core.execution
+import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.services.AbstractServiceKey
@@ -38,9 +39,9 @@ public interface ServerContext {
public val server: Server
/**
- * A list of available processor context instances to use.
+ * A list of processing units available to use.
*/
- public val cpus: List<ProcessorContext>
+ public val cpus: List<ProcessingUnit>
/**
* Publishes the given [service] with key [serviceKey] in the server's registry.
@@ -48,4 +49,20 @@ public interface ServerContext {
public suspend fun <T : Any> publishService(serviceKey: AbstractServiceKey<T>, service: T) {
server.serviceRegistry[serviceKey] = service
}
+
+ /**
+ * Request the specified burst time from the processor cores and suspend execution until a processor core finishes
+ * processing a **non-zero** burst or until the deadline is reached.
+ *
+ * After the method returns, [burst] will contain the remaining burst length for each of the cores (which may be
+ * zero).
+ *
+ * Both [burst] and [limit] must be of the same size and in any other case the method will throw an
+ * [IllegalArgumentException].
+ *
+ * @param burst The burst time to request from each of the processor cores.
+ * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst.
+ * @param deadline The instant at which this request needs to be fulfilled.
+ */
+ public suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long)
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
index 3576b488..27d8091a 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
@@ -26,9 +26,9 @@ package com.atlarge.opendc.compute.core.image
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.launch
+import kotlinx.coroutines.isActive
import java.util.UUID
+import kotlin.coroutines.coroutineContext
import kotlin.math.min
/**
@@ -42,7 +42,7 @@ import kotlin.math.min
* @property cores The number of cores that the image is able to utilize.
* @property utilization A model of the CPU utilization of the application.
*/
-class FlopsApplicationImage(
+data class FlopsApplicationImage(
public override val uid: UUID,
public override val name: String,
public override val tags: TagContainer,
@@ -61,12 +61,15 @@ class FlopsApplicationImage(
*/
override suspend fun invoke(ctx: ServerContext) {
val cores = min(this.cores, ctx.server.flavor.cpuCount)
- val req = flops / cores
+ val burst = LongArray(cores) { flops / cores }
+ val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization }
- coroutineScope {
- for (cpu in ctx.cpus.take(cores)) {
- launch { cpu.run(req, cpu.info.clockRate * utilization, Long.MAX_VALUE) }
+ while (coroutineContext.isActive) {
+ if (burst.all { it == 0L }) {
+ break
}
+
+ ctx.run(burst, maxUsage, Long.MAX_VALUE)
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index 436f4653..52f9068d 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -3,10 +3,10 @@ package com.atlarge.opendc.compute.core.image
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
-import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
+import kotlinx.coroutines.ensureActive
import java.util.UUID
+import kotlin.coroutines.coroutineContext
import kotlin.math.min
class VmImage(
@@ -19,19 +19,20 @@ class VmImage(
) : Image {
override suspend fun invoke(ctx: ServerContext) {
- flopsHistory.forEach { fragment ->
+ for (fragment in flopsHistory) {
+ coroutineContext.ensureActive()
+
if (fragment.flops == 0L) {
delay(fragment.duration)
} else {
val cores = min(this.cores, ctx.server.flavor.cpuCount)
- val req = fragment.flops / cores
- coroutineScope {
- for (cpu in ctx.cpus.take(cores)) {
- val usage = req / (fragment.usage * 1_000_000L)
- launch { cpu.run(req, usage, simulationContext.clock.millis() + fragment.duration) }
- }
- }
+ val burst = LongArray(cores) { fragment.flops / cores }
+ val maxUsage = DoubleArray(cores) { i -> burst[i] / (fragment.usage * 1_000_000L) }
+
+ ctx.run(burst, maxUsage, simulationContext.clock.millis() + fragment.duration)
}
}
}
+
+ override fun toString(): String = "VmImage(uid=$uid, name=$name, cores=$cores, requiredMemory=$requiredMemory)"
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 1adc8652..fcdc9363 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -31,7 +31,6 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.core.execution.ProcessorContext
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.image.EmptyImage
import com.atlarge.opendc.compute.core.image.Image
@@ -53,14 +52,14 @@ import kotlinx.coroutines.withContext
*
* @param uid The unique identifier of the machine.
* @param name An optional name of the machine.
- * @param cpuNodes The CPU nodes/packages available to the bare metal machine.
+ * @param cpus The CPUs available to the bare metal machine.
* @param memoryUnits The memory units in this machine.
* @param domain The simulation domain the driver runs in.
*/
public class SimpleBareMetalDriver(
uid: UUID,
name: String,
- val cpuNodes: List<ProcessingUnit>,
+ val cpus: List<ProcessingUnit>,
val memoryUnits: List<MemoryUnit>,
private val domain: Domain
) : BareMetalDriver {
@@ -77,7 +76,7 @@ public class SimpleBareMetalDriver(
/**
* The flavor that corresponds to this machine.
*/
- private val flavor = Flavor(cpuNodes.sumBy { it.cores }, memoryUnits.map { it.size }.sum())
+ private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum())
/**
* The job that is running the image.
@@ -138,35 +137,10 @@ public class SimpleBareMetalDriver(
}
}
- private data class ProcessorContextImpl(override val info: ProcessingUnit) : ProcessorContext {
- override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long {
- val start = simulationContext.clock.millis()
- val usage = min(maxUsage, info.clockRate) * 1_000_000 // Usage from MHz to Hz
-
- try {
- val duration = min(
- max(0, deadline - start), // Determine duration between now and deadline
- ceil(burst / usage * 1000).toLong() // Convert from seconds to milliseconds
- )
- delay(duration)
- } catch (_: CancellationException) {
- // On cancellation, we compute and return the remaining burst
- }
- val end = simulationContext.clock.millis()
- val granted = ceil((end - start) / 1000.0 * usage).toLong()
- return max(0, burst - granted)
- }
- }
-
private val serverCtx = object : ServerManagementContext {
private var initialized: Boolean = false
- override val cpus: List<ProcessorContextImpl> = cpuNodes
- .asSequence()
- .flatMap { cpu ->
- generateSequence { ProcessorContextImpl(cpu) }.take(cpu.cores)
- }
- .toList()
+ override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus
override var server: Server
get() = node.server!!
@@ -189,8 +163,41 @@ public class SimpleBareMetalDriver(
val previousState = server.state
val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR
server = server.copy(state = state)
- monitor.onUpdate(server, previousState)
initialized = false
+ domain.launch { monitor.onUpdate(server, previousState) }
+ }
+
+ override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
+ require(burst.size == limit.size) { "Array dimensions do not match" }
+
+ val start = simulationContext.clock.millis()
+ var duration = max(0, deadline - start)
+
+ // Determine the duration of the first CPU to finish
+ for (i in 0 until min(cpus.size, burst.size)) {
+ val cpu = cpus[i]
+ val usage = min(limit[i], cpu.frequency) * 1_000_000 // Usage from MHz to Hz
+ val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds
+
+ if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst
+ duration = min(duration, cpuDuration)
+ }
+ }
+
+ try {
+ delay(duration)
+ } catch (_: CancellationException) {
+ // On cancellation, we compute and return the remaining burst
+ }
+
+ val end = simulationContext.clock.millis()
+
+ // Write back the remaining burst time
+ for (i in 0 until min(cpus.size, burst.size)) {
+ val usage = min(limit[i], cpus[i].frequency) * 1_000_000
+ val granted = ceil((end - start) / 1000.0 * usage).toLong()
+ burst[i] = max(0, burst[i] - granted)
+ }
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
index b7848cf3..8d055953 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
@@ -43,7 +43,7 @@ class HypervisorImage(
override val tags: TagContainer = emptyMap()
override suspend fun invoke(ctx: ServerContext) {
- val driver = HypervisorVirtDriver(ctx, VmSchedulerImpl(ctx, hypervisorMonitor))
+ val driver = HypervisorVirtDriver(ctx, hypervisorMonitor)
ctx.publishService(VirtDriver.Key, driver)
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
index e0547dcf..3f358516 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
@@ -28,24 +28,30 @@ import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
+import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.core.execution.ProcessorContext
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor
+import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
+import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Job
+import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import java.util.UUID
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
/**
* A [VirtDriver] that is backed by a simple hypervisor implementation.
*/
class HypervisorVirtDriver(
private val hostContext: ServerContext,
- private val scheduler: VmScheduler
+ private val monitor: HypervisorMonitor
) : VirtDriver {
/**
* A set for tracking the VM context objects.
@@ -84,11 +90,131 @@ class HypervisorVirtDriver(
monitors.remove(monitor)
}
+ /**
+ * The set of [VmServerContext] instances that is being scheduled at the moment.
+ */
+ private val activeVms = mutableSetOf<VmServerContext>()
+
+ /**
+ * The deferred run call.
+ */
+ private var call: Job? = null
+
+ /**
+ * Schedule the vCPUs on the physical CPUs.
+ */
+ private suspend fun reschedule() {
+ flush()
+
+ // Do not schedule a call if there is no work to schedule
+ if (activeVms.isEmpty()) {
+ return
+ }
+
+ val call = simulationContext.domain.launch {
+ val start = simulationContext.clock.millis()
+ val vms = activeVms.toSet()
+
+ var duration: Long = Long.MAX_VALUE
+ var deadline: Long = Long.MAX_VALUE
+ val usage = DoubleArray(hostContext.cpus.size)
+
+ for (vm in vms) {
+ for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) {
+ val cpu = vm.cpus[i]
+
+ // Limit each vCPU to at most an equal share of the host CPU
+ val actualUsage = min(vm.limit[i], cpu.frequency / vms.size)
+
+ // The duration that we want to run is that of the shortest request from a vCPU
+ duration = min(duration, ceil(vm.requestedBurst[i] / (actualUsage * 1_000_000L)).toLong())
+ deadline = min(deadline, vm.deadline)
+ usage[i] += actualUsage
+ }
+ }
+
+ val burst = LongArray(hostContext.cpus.size)
+
+ for (vm in vms) {
+ for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) {
+ val cpu = vm.cpus[i]
+
+ // Limit each vCPU to at most an equal share of the host CPU
+ val actualUsage = min(vm.limit[i], cpu.frequency / vms.size)
+ val actualBurst = (duration * actualUsage * 1_000_000L).toLong()
+
+ burst[i] += actualBurst
+ }
+ }
+
+ val granted = burst.clone()
+ // We run the total burst on the host processor. Note that this call may be cancelled at any moment in
+ // time, so not all of the burst may be executed.
+ hostContext.run(granted, usage, deadline)
+ val end = simulationContext.clock.millis()
+
+ // No work was performed
+ if ((end - start) <= 0) {
+ return@launch
+ }
+
+ for (vm in vms) {
+ for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) {
+ val cpu = vm.cpus[i]
+
+ // Limit each vCPU to at most an equal share of the host CPU
+ val actualUsage = min(vm.limit[i], cpu.frequency / vms.size)
+ val actualBurst = (duration * actualUsage * 1_000_000L).toLong()
+
+ // Compute the fraction of compute time allocated to the VM
+ val fraction = actualUsage / usage[i]
+
+ // Compute the burst time that the VM was actually granted
+ val grantedBurst = max(0, actualBurst - ceil(burst[i] * fraction).toLong())
+
+ // Compute remaining burst time to be executed for the request
+ vm.requestedBurst[i] = max(0, vm.requestedBurst[i] - grantedBurst)
+ }
+
+ if (vm.requestedBurst.any { it == 0L } || vm.deadline <= end) {
+ // Return vCPU `run` call: the requested burst was completed or deadline was exceeded
+ vm.chan.send(Unit)
+ }
+ }
+
+ for (i in burst.indices) {
+ monitor.onSliceFinish(
+ end,
+ burst[i],
+ granted[i],
+ vms.size,
+ hostContext.server
+ )
+ }
+ }
+ this.call = call
+ call.invokeOnCompletion { this.call = null }
+ }
+
+ /**
+ * Flush the progress of the current active VMs.
+ */
+ private fun flush() {
+ val call = call ?: return // If there is no active call, there is nothing to flush
+ // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its
+ // completion.
+ call.cancel()
+ }
+
internal inner class VmServerContext(
override var server: Server,
val monitor: ServerMonitor,
ctx: SimulationContext
) : ServerManagementContext {
+ lateinit var requestedBurst: LongArray
+ lateinit var limit: DoubleArray
+ var deadline: Long = 0L
+ var chan = Channel<Unit>(Channel.RENDEZVOUS)
private var initialized: Boolean = false
internal val job: Job = ctx.domain.launch {
@@ -101,7 +227,7 @@ class HypervisorVirtDriver(
}
}
- override val cpus: List<ProcessorContext> = scheduler.createVirtualCpus(server.flavor)
+ override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount)
override suspend fun init() {
if (initialized) {
@@ -124,5 +250,24 @@ class HypervisorVirtDriver(
vms.remove(this)
monitors.forEach { it.onUpdate(vms.size, availableMemory) }
}
+
+ override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
+ require(burst.size == limit.size) { "Array dimensions do not match" }
+
+ requestedBurst = burst
+ this.limit = limit
+ this.deadline = deadline
+
+ // Wait until the burst has been run or the coroutine is cancelled
+ try {
+ activeVms += this
+ reschedule()
+ chan.receive()
+ } catch (_: CancellationException) {
+ // On cancellation, we compute and return the remaining burst
+ }
+ activeVms -= this
+ reschedule()
+ }
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt
deleted file mode 100644
index 4cc5ac9e..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.compute.virt.driver.hypervisor
-
-import com.atlarge.odcsim.simulationContext
-import com.atlarge.opendc.compute.core.ProcessingUnit
-import com.atlarge.opendc.compute.core.Flavor
-import com.atlarge.opendc.compute.core.execution.ProcessorContext
-import com.atlarge.opendc.compute.core.execution.ServerContext
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
-import kotlinx.coroutines.CancellationException
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * A basic implementation of the [VmScheduler] interface.
- *
- * @property hostContext The [ServerContext] of the host.
- * @property hypervisorMonitor The [HypervisorMonitor] to inform with hypervisor scheduling events.
- */
-public class VmSchedulerImpl(
- private val hostContext: ServerContext,
- private val hypervisorMonitor: HypervisorMonitor
-) : VmScheduler {
- /**
- * The available physical CPUs to schedule.
- */
- private val cpus = hostContext.cpus.map { HostProcessorContext(it, hostContext, hypervisorMonitor) }
-
- override fun createVirtualCpus(flavor: Flavor): List<ProcessorContext> {
- // TODO At the moment, the first N cores get filled the first. Distribute over all cores instead
- require(flavor.cpuCount <= cpus.size) { "Flavor cannot fit on machine" }
-
- return cpus
- .asSequence()
- .take(flavor.cpuCount)
- .sortedBy { it.vcpus.size }
- .map { VirtualProcessorContext(it) }
- .toList()
- }
-
- /**
- * A wrapper around a host [ProcessorContext] that carries additional information about the vCPUs scheduled on the
- * processor.
- */
- internal class HostProcessorContext(
- delegate: ProcessorContext,
- private val hostContext: ServerContext,
- private val hypervisorMonitor: HypervisorMonitor
- ) : ProcessorContext by delegate {
- /**
- * The set of vCPUs scheduled on this processor.
- */
- var vcpus: MutableSet<VirtualProcessorContext> = mutableSetOf()
-
- /**
- * The deferred run call.
- */
- var call: Job? = null
-
- /**
- * Schedule the vCPUs on the physical CPU.
- */
- suspend fun reschedule() {
- flush()
-
- val vcpus = HashSet(vcpus) // Create snapshot of the vCPUs that were scheduled at this moment
- val call = simulationContext.domain.launch {
- var duration: Long = Long.MAX_VALUE
- var deadline: Long = Long.MAX_VALUE
-
- for (vcpu in vcpus) {
- // Limit each vCPU to at most an equal share of the host CPU
- vcpu.actualUsage = min(vcpu.requestedUsage, info.clockRate / vcpus.size)
-
- // The duration that we want to run is that of the shortest request from a vCPU
- duration = min(duration, ceil(vcpu.requestedBurst / (vcpu.actualUsage * 1_000_000L)).toLong())
- deadline = min(deadline, vcpu.requestedDeadline)
- }
-
- var burst: Long = 0
- var usage: Double = 0.0
-
- for (vcpu in vcpus) {
- vcpu.actualBurst = (duration * vcpu.actualUsage * 1_000_000L).toLong()
- burst += vcpu.actualBurst
- usage += vcpu.actualUsage
- }
-
- // Ignore time slice if no work to request
- if (burst <= 0L) {
- return@launch
- }
-
- // We run the total burst on the host processor. Note that this call may be cancelled at any moment in
- // time, so not all of the burst may be executed.
- val remainder = run(burst, usage, deadline)
- val time = simulationContext.clock.millis()
- val totalGrantedBurst: Long = burst - remainder
-
- // Compute for each vCPU the
- for (vcpu in vcpus) {
- // Compute the fraction of compute time allocated to the VM
- val fraction = vcpu.actualUsage / usage
- // Compute the burst time that the VM was actually granted
- val grantedBurst = max(0, vcpu.actualBurst - ceil(remainder * fraction).toLong())
- // Compute remaining burst time to be executed for the request
- vcpu.requestedBurst = max(0, vcpu.requestedBurst - grantedBurst)
-
- if (vcpu.requestedBurst == 0L || vcpu.requestedDeadline <= time) {
- // Return vCPU `run` call: the requested burst was completed or deadline was exceeded
- vcpu.chan.send(Unit)
- }
- }
-
- hypervisorMonitor.onSliceFinish(
- time,
- burst,
- totalGrantedBurst,
- vcpus.size,
- hostContext.server
- )
- }
-
- this.call = call
- call.invokeOnCompletion { this.call = null }
- }
-
- /**
- * Flush the progress of the current active VMs.
- */
- fun flush() {
- val call = call ?: return // If there is no active call, there is nothing to flush
- // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its
- // completion.
- call.cancel()
- }
- }
-
- /**
- * An implementation of [ProcessorContext] that delegates the work to a physical CPU.
- */
- internal class VirtualProcessorContext(val host: HostProcessorContext) : ProcessorContext {
- var actualBurst: Long = 0
- var actualUsage: Double = 0.0
- var requestedBurst: Long = 0
- var requestedUsage: Double = 0.0
- var requestedDeadline: Long = 0
- var chan = Channel<Unit>(Channel.RENDEZVOUS)
-
- override val info: ProcessingUnit
- get() = host.info
-
- override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long {
- requestedBurst = burst
- requestedUsage = maxUsage
- requestedDeadline = deadline
-
- // Wait until the burst has been run or the coroutine is cancelled
- try {
- host.vcpus.add(this)
- host.reschedule()
- chan.receive()
- } catch (_: CancellationException) {
- // On cancellation, we compute and return the remaining burst
- }
-
- host.vcpus.remove(this)
- host.reschedule()
- return requestedBurst
- }
- }
-}