summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt48
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt16
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt15
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt19
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt63
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt135
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt38
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt199
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt6
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt7
11 files changed, 211 insertions, 337 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt
deleted file mode 100644
index c081acd5..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.compute.core.execution
-
-import com.atlarge.opendc.compute.core.ProcessingUnit
-
-/**
- * An interface for managing a single processing core (CPU) of a (virtual) machine.
- */
-public interface ProcessorContext {
- /**
- * The information about the processing unit.
- */
- public val info: ProcessingUnit
-
- /**
- * Request the specified burst time from the processor and suspend execution until the processor finishes
- * processing of the requested burst.
- *
- * @param burst The burst time to request from the processor.
- * @param maxUsage The maximum usage in terms of MHz that the processing core may use while running the burst.
- * @param deadline The instant at which this request needs to be fulfilled.
- * @return The remaining burst time in case the method was cancelled or zero if the processor finished running.
- */
- public suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long
-}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index 53b00aa6..3a804f51 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.compute.core.execution
+import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.services.AbstractServiceKey
@@ -38,9 +39,9 @@ public interface ServerContext {
public val server: Server
/**
- * A list of available processor context instances to use.
+ * A list of processing units available to use.
*/
- public val cpus: List<ProcessorContext>
+ public val cpus: List<ProcessingUnit>
/**
* Publishes the given [service] with key [serviceKey] in the server's registry.
@@ -48,4 +49,15 @@ public interface ServerContext {
public suspend fun <T : Any> publishService(serviceKey: AbstractServiceKey<T>, service: T) {
server.serviceRegistry[serviceKey] = service
}
+
+ /**
+ * Request the specified burst time from the processor cores and suspend execution until a processor core finishes
+ * processing a **non-zero** burst or until the deadline is reached.
+ *
+ * @param burst The burst time to request from each of the processor cores.
+ * @param maxUsage The maximum usage in terms of MHz that the processing core may use while running the burst.
+ * @param deadline The instant at which this request needs to be fulfilled.
+ * @return The remaining burst time of the processor cores.
+ */
+ public suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
index de429d41..d2d35db9 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
@@ -26,9 +26,9 @@ package com.atlarge.opendc.compute.core.image
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.launch
+import kotlinx.coroutines.isActive
import java.util.UUID
+import kotlin.coroutines.coroutineContext
import kotlin.math.min
/**
@@ -61,12 +61,15 @@ class FlopsApplicationImage(
*/
override suspend fun invoke(ctx: ServerContext) {
val cores = min(this.cores, ctx.server.flavor.cpuCount)
- val req = flops / cores
+ var burst = LongArray(cores) { flops / cores }
+ val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization }
- coroutineScope {
- for (cpu in ctx.cpus.take(cores)) {
- launch { cpu.run(req, cpu.info.frequency * utilization, Long.MAX_VALUE) }
+ while (coroutineContext.isActive) {
+ if (burst.all { it == 0L }) {
+ break
}
+
+ burst = ctx.run(burst, maxUsage, Long.MAX_VALUE)
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index 436f4653..7c4fe839 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -3,10 +3,10 @@ package com.atlarge.opendc.compute.core.image
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
-import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
+import kotlinx.coroutines.ensureActive
import java.util.UUID
+import kotlin.coroutines.coroutineContext
import kotlin.math.min
class VmImage(
@@ -19,18 +19,17 @@ class VmImage(
) : Image {
override suspend fun invoke(ctx: ServerContext) {
- flopsHistory.forEach { fragment ->
+ for (fragment in flopsHistory) {
+ coroutineContext.ensureActive()
+
if (fragment.flops == 0L) {
delay(fragment.duration)
} else {
val cores = min(this.cores, ctx.server.flavor.cpuCount)
- val req = fragment.flops / cores
- coroutineScope {
- for (cpu in ctx.cpus.take(cores)) {
- val usage = req / (fragment.usage * 1_000_000L)
- launch { cpu.run(req, usage, simulationContext.clock.millis() + fragment.duration) }
- }
- }
+ val burst = LongArray(cores) { fragment.flops / cores }
+ val maxUsage = DoubleArray(cores) { i -> burst[i] / (fragment.usage * 1_000_000L) }
+
+ ctx.run(burst, maxUsage, simulationContext.clock.millis() + fragment.duration)
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index e1b7b178..d86e04ff 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -31,7 +31,6 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.core.execution.ProcessorContext
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.image.EmptyImage
import com.atlarge.opendc.compute.core.image.Image
@@ -138,32 +137,10 @@ public class SimpleBareMetalDriver(
}
}
- private data class ProcessorContextImpl(override val info: ProcessingUnit) : ProcessorContext {
- override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long {
- val start = simulationContext.clock.millis()
- val usage = min(maxUsage, info.frequency) * 1_000_000 // Usage from MHz to Hz
-
- try {
- val duration = min(
- max(0, deadline - start), // Determine duration between now and deadline
- ceil(burst / usage * 1000).toLong() // Convert from seconds to milliseconds
- )
- delay(duration)
- } catch (_: CancellationException) {
- // On cancellation, we compute and return the remaining burst
- }
- val end = simulationContext.clock.millis()
- val granted = ceil((end - start) / 1000.0 * usage).toLong()
- return max(0, burst - granted)
- }
- }
-
private val serverCtx = object : ServerManagementContext {
private var initialized: Boolean = false
- override val cpus: List<ProcessorContextImpl> = this@SimpleBareMetalDriver.cpus
- .map { ProcessorContextImpl(it) }
- .toList()
+ override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus
override var server: Server
get() = node.server!!
@@ -186,8 +163,44 @@ public class SimpleBareMetalDriver(
val previousState = server.state
val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR
server = server.copy(state = state)
- monitor.onUpdate(server, previousState)
initialized = false
+ domain.launch { monitor.onUpdate(server, previousState) }
+ }
+
+ override suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray {
+ val start = simulationContext.clock.millis()
+ var duration = max(0, deadline - start)
+
+ for (i in 0..cpus.size) {
+ if (i >= burst.size || i >= maxUsage.size) {
+ continue
+ }
+
+ val cpu = cpus[i]
+ val usage = min(maxUsage[i], cpu.frequency) * 1_000_000 // Usage from MHz to Hz
+ val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds
+
+ if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst
+ duration = min(duration, cpuDuration)
+ }
+ }
+
+ try {
+ delay(duration)
+ } catch (_: CancellationException) {
+ // On cancellation, we compute and return the remaining burst
+ }
+
+ val end = simulationContext.clock.millis()
+ return LongArray(cpus.size) { i ->
+ if (i < burst.size && i < maxUsage.size) {
+ val usage = min(maxUsage[i], cpus[i].frequency) * 1_000_000
+ val granted = ceil((end - start) / 1000.0 * usage).toLong()
+ max(0, burst[i] - granted)
+ } else {
+ 0
+ }
+ }
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
index b7848cf3..8d055953 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
@@ -43,7 +43,7 @@ class HypervisorImage(
override val tags: TagContainer = emptyMap()
override suspend fun invoke(ctx: ServerContext) {
- val driver = HypervisorVirtDriver(ctx, VmSchedulerImpl(ctx, hypervisorMonitor))
+ val driver = HypervisorVirtDriver(ctx, hypervisorMonitor)
ctx.publishService(VirtDriver.Key, driver)
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
index e0547dcf..87c4f073 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
@@ -28,24 +28,30 @@ import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
+import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.core.execution.ProcessorContext
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor
+import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
+import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Job
+import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import java.util.UUID
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
/**
* A [VirtDriver] that is backed by a simple hypervisor implementation.
*/
class HypervisorVirtDriver(
private val hostContext: ServerContext,
- private val scheduler: VmScheduler
+ private val monitor: HypervisorMonitor
) : VirtDriver {
/**
* A set for tracking the VM context objects.
@@ -84,11 +90,114 @@ class HypervisorVirtDriver(
monitors.remove(monitor)
}
+ /**
+ * The set of [VmServerContext] instances that is being scheduled at the moment.
+ */
+ private val activeVms = mutableSetOf<VmServerContext>()
+
+ /**
+ * The deferred run call.
+ */
+ private var call: Job? = null
+
+ /**
+ * Schedule the vCPUs on the physical CPUs.
+ */
+ private suspend fun reschedule() {
+ flush()
+ val call = simulationContext.domain.launch {
+ val start = simulationContext.clock.millis()
+ val vms = activeVms.toSet()
+
+ var duration: Long = Long.MAX_VALUE
+ var deadline: Long = Long.MAX_VALUE
+ val usage = DoubleArray(hostContext.cpus.size)
+
+ for (vm in vms) {
+ for (i in vm.cpus.indices) {
+ val cpu = vm.cpus[i]
+
+ // Limit each vCPU to at most an equal share of the host CPU
+ val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size)
+
+ // The duration that we want to run is that of the shortest request from a vCPU
+ duration = min(duration, ceil(vm.requestedBurst[i] / (actualUsage * 1_000_000L)).toLong())
+ deadline = min(deadline, vm.requestedDeadline)
+ usage[i] += actualUsage
+ }
+ }
+
+ val burst = LongArray(hostContext.cpus.size)
+
+ for (vm in vms) {
+ for (i in vm.cpus.indices) {
+ val cpu = vm.cpus[i]
+
+ // Limit each vCPU to at most an equal share of the host CPU
+ val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size)
+ val actualBurst = (duration * actualUsage * 1_000_000L).toLong()
+
+ burst[i] += actualBurst
+ }
+ }
+
+ // We run the total burst on the host processor. Note that this call may be cancelled at any moment in
+ // time, so not all of the burst may be executed.
+ val remainder = hostContext.run(burst, usage, deadline)
+ val end = simulationContext.clock.millis()
+
+ // No work was performed
+ if ((end - start) <= 0) {
+ return@launch
+ }
+
+ for (vm in vms) {
+ for (i in vm.cpus.indices) {
+ val cpu = vm.cpus[i]
+
+ // Limit each vCPU to at most an equal share of the host CPU
+ val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size)
+
+ // Compute the fraction of compute time allocated to the VM
+ val fraction = actualUsage / usage[i]
+
+ // Compute the burst time that the VM was actually granted
+ val grantedBurst = max(0, burst[i] - ceil(remainder[i] * fraction).toLong())
+
+ // Compute remaining burst time to be executed for the request
+ vm.requestedBurst[i] = max(0, vm.requestedBurst[i] - grantedBurst)
+ }
+
+ if (vm.requestedBurst.any { it == 0L } || vm.requestedDeadline <= end) {
+ // Return vCPU `run` call: the requested burst was completed or deadline was exceeded
+ vm.chan.send(Unit)
+ }
+ }
+ }
+
+ this.call = call
+ call.invokeOnCompletion { this.call = null }
+ }
+
+ /**
+ * Flush the progress of the current active VMs.
+ */
+ private fun flush() {
+ val call = call ?: return // If there is no active call, there is nothing to flush
+ // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its
+ // completion.
+ call.cancel()
+ }
+
internal inner class VmServerContext(
override var server: Server,
val monitor: ServerMonitor,
ctx: SimulationContext
) : ServerManagementContext {
+ val requestedBurst: LongArray = LongArray(server.flavor.cpuCount)
+ val requestedUsage: DoubleArray = DoubleArray(server.flavor.cpuCount)
+ var requestedDeadline: Long = 0L
+ var chan = Channel<Unit>(Channel.RENDEZVOUS)
private var initialized: Boolean = false
internal val job: Job = ctx.domain.launch {
@@ -101,7 +210,7 @@ class HypervisorVirtDriver(
}
}
- override val cpus: List<ProcessorContext> = scheduler.createVirtualCpus(server.flavor)
+ override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount)
override suspend fun init() {
if (initialized) {
@@ -124,5 +233,25 @@ class HypervisorVirtDriver(
vms.remove(this)
monitors.forEach { it.onUpdate(vms.size, availableMemory) }
}
+
+ override suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray {
+ for (i in cpus.indices) {
+ requestedBurst[i] = if (i < burst.size) burst[i] else 0
+ requestedUsage[i] = if (i < maxUsage.size) maxUsage[i] else 0.0
+ }
+ requestedDeadline = deadline
+
+ // Wait until the burst has been run or the coroutine is cancelled
+ try {
+ activeVms += this
+ reschedule()
+ chan.receive()
+ } catch (_: CancellationException) {
+ // On cancellation, we compute and return the remaining burst
+ }
+ activeVms -= this
+ reschedule()
+ return requestedBurst
+ }
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt
deleted file mode 100644
index 7b00d99c..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.compute.virt.driver.hypervisor
-
-import com.atlarge.opendc.compute.core.Flavor
-import com.atlarge.opendc.compute.core.execution.ProcessorContext
-
-/**
- * A scheduler that assigns virtual CPUs to virtual machines and maps them to physical CPUs.
- */
-public interface VmScheduler {
- /**
- * Create the virtual CPUs for the specified [flavor].
- */
- fun createVirtualCpus(flavor: Flavor): List<ProcessorContext>
-}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt
deleted file mode 100644
index f232d695..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.compute.virt.driver.hypervisor
-
-import com.atlarge.odcsim.simulationContext
-import com.atlarge.opendc.compute.core.ProcessingUnit
-import com.atlarge.opendc.compute.core.Flavor
-import com.atlarge.opendc.compute.core.execution.ProcessorContext
-import com.atlarge.opendc.compute.core.execution.ServerContext
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
-import kotlinx.coroutines.CancellationException
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * A basic implementation of the [VmScheduler] interface.
- *
- * @property hostContext The [ServerContext] of the host.
- * @property hypervisorMonitor The [HypervisorMonitor] to inform with hypervisor scheduling events.
- */
-public class VmSchedulerImpl(
- private val hostContext: ServerContext,
- private val hypervisorMonitor: HypervisorMonitor
-) : VmScheduler {
- /**
- * The available physical CPUs to schedule.
- */
- private val cpus = hostContext.cpus.map { HostProcessorContext(it, hostContext, hypervisorMonitor) }
-
- override fun createVirtualCpus(flavor: Flavor): List<ProcessorContext> {
- // TODO At the moment, the first N cores get filled the first. Distribute over all cores instead
- require(flavor.cpuCount <= cpus.size) { "Flavor cannot fit on machine" }
-
- return cpus
- .asSequence()
- .take(flavor.cpuCount)
- .sortedBy { it.vcpus.size }
- .map { VirtualProcessorContext(it) }
- .toList()
- }
-
- /**
- * A wrapper around a host [ProcessorContext] that carries additional information about the vCPUs scheduled on the
- * processor.
- */
- internal class HostProcessorContext(
- delegate: ProcessorContext,
- private val hostContext: ServerContext,
- private val hypervisorMonitor: HypervisorMonitor
- ) : ProcessorContext by delegate {
- /**
- * The set of vCPUs scheduled on this processor.
- */
- var vcpus: MutableSet<VirtualProcessorContext> = mutableSetOf()
-
- /**
- * The deferred run call.
- */
- var call: Job? = null
-
- /**
- * Schedule the vCPUs on the physical CPU.
- */
- suspend fun reschedule() {
- flush()
-
- val vcpus = HashSet(vcpus) // Create snapshot of the vCPUs that were scheduled at this moment
- val call = simulationContext.domain.launch {
- var duration: Long = Long.MAX_VALUE
- var deadline: Long = Long.MAX_VALUE
-
- for (vcpu in vcpus) {
- // Limit each vCPU to at most an equal share of the host CPU
- vcpu.actualUsage = min(vcpu.requestedUsage, info.frequency / vcpus.size)
-
- // The duration that we want to run is that of the shortest request from a vCPU
- duration = min(duration, ceil(vcpu.requestedBurst / (vcpu.actualUsage * 1_000_000L)).toLong())
- deadline = min(deadline, vcpu.requestedDeadline)
- }
-
- var burst: Long = 0
- var usage: Double = 0.0
-
- for (vcpu in vcpus) {
- vcpu.actualBurst = (duration * vcpu.actualUsage * 1_000_000L).toLong()
- burst += vcpu.actualBurst
- usage += vcpu.actualUsage
- }
-
- // Ignore time slice if no work to request
- if (burst <= 0L) {
- return@launch
- }
-
- // We run the total burst on the host processor. Note that this call may be cancelled at any moment in
- // time, so not all of the burst may be executed.
- val remainder = run(burst, usage, deadline)
- val time = simulationContext.clock.millis()
- val totalGrantedBurst: Long = burst - remainder
-
- // Compute for each vCPU the
- for (vcpu in vcpus) {
- // Compute the fraction of compute time allocated to the VM
- val fraction = vcpu.actualUsage / usage
- // Compute the burst time that the VM was actually granted
- val grantedBurst = max(0, vcpu.actualBurst - ceil(remainder * fraction).toLong())
- // Compute remaining burst time to be executed for the request
- vcpu.requestedBurst = max(0, vcpu.requestedBurst - grantedBurst)
-
- if (vcpu.requestedBurst == 0L || vcpu.requestedDeadline <= time) {
- // Return vCPU `run` call: the requested burst was completed or deadline was exceeded
- vcpu.chan.send(Unit)
- }
- }
-
- hypervisorMonitor.onSliceFinish(
- time,
- burst,
- totalGrantedBurst,
- vcpus.size,
- hostContext.server
- )
- }
-
- this.call = call
- call.invokeOnCompletion { this.call = null }
- }
-
- /**
- * Flush the progress of the current active VMs.
- */
- fun flush() {
- val call = call ?: return // If there is no active call, there is nothing to flush
- // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its
- // completion.
- call.cancel()
- }
- }
-
- /**
- * An implementation of [ProcessorContext] that delegates the work to a physical CPU.
- */
- internal class VirtualProcessorContext(val host: HostProcessorContext) : ProcessorContext {
- var actualBurst: Long = 0
- var actualUsage: Double = 0.0
- var requestedBurst: Long = 0
- var requestedUsage: Double = 0.0
- var requestedDeadline: Long = 0
- var chan = Channel<Unit>(Channel.RENDEZVOUS)
-
- override val info: ProcessingUnit
- get() = host.info
-
- override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long {
- requestedBurst = burst
- requestedUsage = maxUsage
- requestedDeadline = deadline
-
- // Wait until the burst has been run or the coroutine is cancelled
- try {
- host.vcpus.add(this)
- host.reschedule()
- chan.receive()
- } catch (_: CancellationException) {
- // On cancellation, we compute and return the remaining burst
- }
-
- host.vcpus.remove(this)
- host.reschedule()
- return requestedBurst
- }
- }
-}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index 6468a408..84b16b68 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -25,6 +25,7 @@
package com.atlarge.opendc.compute.metal.driver
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
@@ -53,15 +54,16 @@ internal class SimpleBareMetalDriverTest {
root.launch {
val dom = root.newDomain(name = "driver")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
- val cpus = List(5) { ProcessingUnit(cpuNode, it, 2400.0) }
+ val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", cpus, emptyList(), dom)
val monitor = object : ServerMonitor {
override suspend fun onUpdate(server: Server, previousState: ServerState) {
+ println("[${simulationContext.clock.millis()}] $server")
finalState = server.state
}
}
- val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
+ val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000_000_000, 2)
// Batch driver commands
withContext(dom.coroutineContext) {
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
index 7e5a4bbe..6cfb2317 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
@@ -69,8 +69,8 @@ internal class HypervisorTest {
println("Hello World!")
}
})
- val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000_000, 1)
- val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000_000, 1)
+ val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000_000_000, 1)
+ val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000_000_000, 1)
val monitor = object : ServerMonitor {
override suspend fun onUpdate(server: Server, previousState: ServerState) {
println("[${simulationContext.clock.millis()}]: $server")
@@ -80,12 +80,13 @@ internal class HypervisorTest {
val driverDom = root.newDomain("driver")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
- val cpus = List(5) { ProcessingUnit(cpuNode, it, 2000.0) }
+ val cpus = List(4) { ProcessingUnit(cpuNode, it, 2000.0) }
val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", cpus, emptyList(), driverDom)
metalDriver.init(monitor)
metalDriver.setImage(vmm)
metalDriver.setPower(PowerState.POWER_ON)
+
delay(5)
val flavor = Flavor(1, 0)