From f13cda61c142ff3d1a2e75de2b05667bdb3ab3ae Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 28 Feb 2020 15:59:14 +0100 Subject: refactor: Create distinction between CPU node and core This change updates the terminology in the `opendc-compute` module to make a distinction between CPU node and CPU core, where we primarly work with CPU cores. However, if needed, we also provide information for the different CPU nodes. --- .../atlarge/opendc/compute/core/ProcessingNode.kt | 40 ++++++++++++++++++++++ .../atlarge/opendc/compute/core/ProcessingUnit.kt | 18 ++++------ .../compute/core/image/FlopsApplicationImage.kt | 2 +- .../compute/metal/driver/SimpleBareMetalDriver.kt | 15 ++++---- .../virt/driver/hypervisor/VmSchedulerImpl.kt | 2 +- .../metal/driver/SimpleBareMetalDriverTest.kt | 7 ++-- .../metal/service/SimpleProvisioningServiceTest.kt | 6 +++- .../virt/driver/hypervisor/HypervisorTest.kt | 6 +++- 8 files changed, 69 insertions(+), 27 deletions(-) create mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingNode.kt (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingNode.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingNode.kt new file mode 100644 index 00000000..91f5dde9 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingNode.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.core + +/** + * A processing node/package/socket containing possibly several CPU cores. + * + * @property vendor The vendor string of the processor node. + * @property modelName The name of the processor node. + * @property arch The micro-architecture of the processor node. + * @property coreCount The number of logical CPUs in the processor node. + */ +data class ProcessingNode( + val vendor: String, + val arch: String, + val modelName: String, + val coreCount: Int +) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingUnit.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingUnit.kt index 945b7061..dbf6d824 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingUnit.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingUnit.kt @@ -25,18 +25,14 @@ package com.atlarge.opendc.compute.core /** - * A processing unit of a compute resource, either virtual or physical. + * A single logical compute unit of processor node, either virtual or physical. * - * @property vendor The vendor string of the cpu. - * @property modelName The name of the cpu model. - * @property arch The architecture of the CPU. - * @property clockRate The clock speed of the cpu in MHz. - * @property cores The number of logical cores in the cpu. + * @property node The processing node containing the CPU core. + * @property id The identifier of the CPU core within the processing node. + * @property frequency The clock rate of the CPU. */ public data class ProcessingUnit( - public val vendor: String, - public val modelName: String, - public val arch: String, - public val clockRate: Double, - public val cores: Int + public val node: ProcessingNode, + public val id: Int, + public val frequency: Double ) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt index 3576b488..de429d41 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt @@ -65,7 +65,7 @@ class FlopsApplicationImage( coroutineScope { for (cpu in ctx.cpus.take(cores)) { - launch { cpu.run(req, cpu.info.clockRate * utilization, Long.MAX_VALUE) } + launch { cpu.run(req, cpu.info.frequency * utilization, Long.MAX_VALUE) } } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 1adc8652..e1b7b178 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -53,14 +53,14 @@ import kotlinx.coroutines.withContext * * @param uid The unique identifier of the machine. * @param name An optional name of the machine. - * @param cpuNodes The CPU nodes/packages available to the bare metal machine. + * @param cpus The CPUs available to the bare metal machine. * @param memoryUnits The memory units in this machine. * @param domain The simulation domain the driver runs in. */ public class SimpleBareMetalDriver( uid: UUID, name: String, - val cpuNodes: List, + val cpus: List, val memoryUnits: List, private val domain: Domain ) : BareMetalDriver { @@ -77,7 +77,7 @@ public class SimpleBareMetalDriver( /** * The flavor that corresponds to this machine. */ - private val flavor = Flavor(cpuNodes.sumBy { it.cores }, memoryUnits.map { it.size }.sum()) + private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum()) /** * The job that is running the image. @@ -141,7 +141,7 @@ public class SimpleBareMetalDriver( private data class ProcessorContextImpl(override val info: ProcessingUnit) : ProcessorContext { override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long { val start = simulationContext.clock.millis() - val usage = min(maxUsage, info.clockRate) * 1_000_000 // Usage from MHz to Hz + val usage = min(maxUsage, info.frequency) * 1_000_000 // Usage from MHz to Hz try { val duration = min( @@ -161,11 +161,8 @@ public class SimpleBareMetalDriver( private val serverCtx = object : ServerManagementContext { private var initialized: Boolean = false - override val cpus: List = cpuNodes - .asSequence() - .flatMap { cpu -> - generateSequence { ProcessorContextImpl(cpu) }.take(cpu.cores) - } + override val cpus: List = this@SimpleBareMetalDriver.cpus + .map { ProcessorContextImpl(it) } .toList() override var server: Server diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt index 4cc5ac9e..f232d695 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt @@ -97,7 +97,7 @@ public class VmSchedulerImpl( for (vcpu in vcpus) { // Limit each vCPU to at most an equal share of the host CPU - vcpu.actualUsage = min(vcpu.requestedUsage, info.clockRate / vcpus.size) + vcpu.actualUsage = min(vcpu.requestedUsage, info.frequency / vcpus.size) // The duration that we want to run is that of the shortest request from a vCPU duration = min(duration, ceil(vcpu.requestedBurst / (vcpu.actualUsage * 1_000_000L)).toLong()) diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index 6b234b73..6468a408 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -25,7 +25,7 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState @@ -52,8 +52,9 @@ internal class SimpleBareMetalDriverTest { val root = system.newDomain(name = "root") root.launch { val dom = root.newDomain(name = "driver") - val flavor = Flavor(4, 0) - val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList(), dom) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) + val cpus = List(5) { ProcessingUnit(cpuNode, it, 2400.0) } + val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", cpus, emptyList(), dom) val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index 3b32b3b8..d5366552 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.compute.metal.service import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState @@ -59,7 +60,10 @@ internal class SimpleProvisioningServiceTest { } val dom = root.newDomain("provisioner") - val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList(), dom) + + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) + val cpus = List(5) { ProcessingUnit(cpuNode, it, 2400.0) } + val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", cpus, emptyList(), dom) val provisioner = SimpleProvisioningService(dom) provisioner.create(driver) diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt index 002fa175..7e5a4bbe 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -29,6 +29,7 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.core.monitor.ServerMonitor @@ -77,7 +78,10 @@ internal class HypervisorTest { } val driverDom = root.newDomain("driver") - val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2000.0, 1)), emptyList(), driverDom) + + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) + val cpus = List(5) { ProcessingUnit(cpuNode, it, 2000.0) } + val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", cpus, emptyList(), driverDom) metalDriver.init(monitor) metalDriver.setImage(vmm) -- cgit v1.2.3 From 6ba8fec85214feca97d6c809aa816e2807ae547b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 29 Feb 2020 15:27:06 +0100 Subject: refactor: Report CPU usage per server This change refactors the codebase so that the CPU usage of the server is only reported per server, instead of per CPU reducing the total amount of messages needed and additionally simplifying synchronization of various computations. --- .../compute/core/execution/ProcessorContext.kt | 48 ----- .../opendc/compute/core/execution/ServerContext.kt | 16 +- .../compute/core/image/FlopsApplicationImage.kt | 15 +- .../atlarge/opendc/compute/core/image/VmImage.kt | 19 +- .../compute/metal/driver/SimpleBareMetalDriver.kt | 63 ++++--- .../virt/driver/hypervisor/HypervisorImage.kt | 2 +- .../virt/driver/hypervisor/HypervisorVirtDriver.kt | 135 +++++++++++++- .../compute/virt/driver/hypervisor/VmScheduler.kt | 38 ---- .../virt/driver/hypervisor/VmSchedulerImpl.kt | 199 --------------------- .../metal/driver/SimpleBareMetalDriverTest.kt | 6 +- .../virt/driver/hypervisor/HypervisorTest.kt | 7 +- 11 files changed, 211 insertions(+), 337 deletions(-) delete mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt delete mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt delete mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt deleted file mode 100644 index c081acd5..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.core.execution - -import com.atlarge.opendc.compute.core.ProcessingUnit - -/** - * An interface for managing a single processing core (CPU) of a (virtual) machine. - */ -public interface ProcessorContext { - /** - * The information about the processing unit. - */ - public val info: ProcessingUnit - - /** - * Request the specified burst time from the processor and suspend execution until the processor finishes - * processing of the requested burst. - * - * @param burst The burst time to request from the processor. - * @param maxUsage The maximum usage in terms of MHz that the processing core may use while running the burst. - * @param deadline The instant at which this request needs to be fulfilled. - * @return The remaining burst time in case the method was cancelled or zero if the processor finished running. - */ - public suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index 53b00aa6..3a804f51 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.compute.core.execution +import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.services.AbstractServiceKey @@ -38,9 +39,9 @@ public interface ServerContext { public val server: Server /** - * A list of available processor context instances to use. + * A list of processing units available to use. */ - public val cpus: List + public val cpus: List /** * Publishes the given [service] with key [serviceKey] in the server's registry. @@ -48,4 +49,15 @@ public interface ServerContext { public suspend fun publishService(serviceKey: AbstractServiceKey, service: T) { server.serviceRegistry[serviceKey] = service } + + /** + * Request the specified burst time from the processor cores and suspend execution until a processor core finishes + * processing a **non-zero** burst or until the deadline is reached. + * + * @param burst The burst time to request from each of the processor cores. + * @param maxUsage The maximum usage in terms of MHz that the processing core may use while running the burst. + * @param deadline The instant at which this request needs to be fulfilled. + * @return The remaining burst time of the processor cores. + */ + public suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt index de429d41..d2d35db9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt @@ -26,9 +26,9 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch +import kotlinx.coroutines.isActive import java.util.UUID +import kotlin.coroutines.coroutineContext import kotlin.math.min /** @@ -61,12 +61,15 @@ class FlopsApplicationImage( */ override suspend fun invoke(ctx: ServerContext) { val cores = min(this.cores, ctx.server.flavor.cpuCount) - val req = flops / cores + var burst = LongArray(cores) { flops / cores } + val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization } - coroutineScope { - for (cpu in ctx.cpus.take(cores)) { - launch { cpu.run(req, cpu.info.frequency * utilization, Long.MAX_VALUE) } + while (coroutineContext.isActive) { + if (burst.all { it == 0L }) { + break } + + burst = ctx.run(burst, maxUsage, Long.MAX_VALUE) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 436f4653..7c4fe839 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -3,10 +3,10 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay -import kotlinx.coroutines.launch +import kotlinx.coroutines.ensureActive import java.util.UUID +import kotlin.coroutines.coroutineContext import kotlin.math.min class VmImage( @@ -19,18 +19,17 @@ class VmImage( ) : Image { override suspend fun invoke(ctx: ServerContext) { - flopsHistory.forEach { fragment -> + for (fragment in flopsHistory) { + coroutineContext.ensureActive() + if (fragment.flops == 0L) { delay(fragment.duration) } else { val cores = min(this.cores, ctx.server.flavor.cpuCount) - val req = fragment.flops / cores - coroutineScope { - for (cpu in ctx.cpus.take(cores)) { - val usage = req / (fragment.usage * 1_000_000L) - launch { cpu.run(req, usage, simulationContext.clock.millis() + fragment.duration) } - } - } + val burst = LongArray(cores) { fragment.flops / cores } + val maxUsage = DoubleArray(cores) { i -> burst[i] / (fragment.usage * 1_000_000L) } + + ctx.run(burst, maxUsage, simulationContext.clock.millis() + fragment.duration) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index e1b7b178..d86e04ff 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -31,7 +31,6 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.execution.ProcessorContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.image.EmptyImage import com.atlarge.opendc.compute.core.image.Image @@ -138,32 +137,10 @@ public class SimpleBareMetalDriver( } } - private data class ProcessorContextImpl(override val info: ProcessingUnit) : ProcessorContext { - override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long { - val start = simulationContext.clock.millis() - val usage = min(maxUsage, info.frequency) * 1_000_000 // Usage from MHz to Hz - - try { - val duration = min( - max(0, deadline - start), // Determine duration between now and deadline - ceil(burst / usage * 1000).toLong() // Convert from seconds to milliseconds - ) - delay(duration) - } catch (_: CancellationException) { - // On cancellation, we compute and return the remaining burst - } - val end = simulationContext.clock.millis() - val granted = ceil((end - start) / 1000.0 * usage).toLong() - return max(0, burst - granted) - } - } - private val serverCtx = object : ServerManagementContext { private var initialized: Boolean = false - override val cpus: List = this@SimpleBareMetalDriver.cpus - .map { ProcessorContextImpl(it) } - .toList() + override val cpus: List = this@SimpleBareMetalDriver.cpus override var server: Server get() = node.server!! @@ -186,8 +163,44 @@ public class SimpleBareMetalDriver( val previousState = server.state val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR server = server.copy(state = state) - monitor.onUpdate(server, previousState) initialized = false + domain.launch { monitor.onUpdate(server, previousState) } + } + + override suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray { + val start = simulationContext.clock.millis() + var duration = max(0, deadline - start) + + for (i in 0..cpus.size) { + if (i >= burst.size || i >= maxUsage.size) { + continue + } + + val cpu = cpus[i] + val usage = min(maxUsage[i], cpu.frequency) * 1_000_000 // Usage from MHz to Hz + val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds + + if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst + duration = min(duration, cpuDuration) + } + } + + try { + delay(duration) + } catch (_: CancellationException) { + // On cancellation, we compute and return the remaining burst + } + + val end = simulationContext.clock.millis() + return LongArray(cpus.size) { i -> + if (i < burst.size && i < maxUsage.size) { + val usage = min(maxUsage[i], cpus[i].frequency) * 1_000_000 + val granted = ceil((end - start) / 1000.0 * usage).toLong() + max(0, burst[i] - granted) + } else { + 0 + } + } } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt index b7848cf3..8d055953 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt @@ -43,7 +43,7 @@ class HypervisorImage( override val tags: TagContainer = emptyMap() override suspend fun invoke(ctx: ServerContext) { - val driver = HypervisorVirtDriver(ctx, VmSchedulerImpl(ctx, hypervisorMonitor)) + val driver = HypervisorVirtDriver(ctx, hypervisorMonitor) ctx.publishService(VirtDriver.Key, driver) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index e0547dcf..87c4f073 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -28,24 +28,30 @@ import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.execution.ProcessorContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor +import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import java.util.UUID +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min /** * A [VirtDriver] that is backed by a simple hypervisor implementation. */ class HypervisorVirtDriver( private val hostContext: ServerContext, - private val scheduler: VmScheduler + private val monitor: HypervisorMonitor ) : VirtDriver { /** * A set for tracking the VM context objects. @@ -84,11 +90,114 @@ class HypervisorVirtDriver( monitors.remove(monitor) } + /** + * The set of [VmServerContext] instances that is being scheduled at the moment. + */ + private val activeVms = mutableSetOf() + + /** + * The deferred run call. + */ + private var call: Job? = null + + /** + * Schedule the vCPUs on the physical CPUs. + */ + private suspend fun reschedule() { + flush() + val call = simulationContext.domain.launch { + val start = simulationContext.clock.millis() + val vms = activeVms.toSet() + + var duration: Long = Long.MAX_VALUE + var deadline: Long = Long.MAX_VALUE + val usage = DoubleArray(hostContext.cpus.size) + + for (vm in vms) { + for (i in vm.cpus.indices) { + val cpu = vm.cpus[i] + + // Limit each vCPU to at most an equal share of the host CPU + val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size) + + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, ceil(vm.requestedBurst[i] / (actualUsage * 1_000_000L)).toLong()) + deadline = min(deadline, vm.requestedDeadline) + usage[i] += actualUsage + } + } + + val burst = LongArray(hostContext.cpus.size) + + for (vm in vms) { + for (i in vm.cpus.indices) { + val cpu = vm.cpus[i] + + // Limit each vCPU to at most an equal share of the host CPU + val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size) + val actualBurst = (duration * actualUsage * 1_000_000L).toLong() + + burst[i] += actualBurst + } + } + + // We run the total burst on the host processor. Note that this call may be cancelled at any moment in + // time, so not all of the burst may be executed. + val remainder = hostContext.run(burst, usage, deadline) + val end = simulationContext.clock.millis() + + // No work was performed + if ((end - start) <= 0) { + return@launch + } + + for (vm in vms) { + for (i in vm.cpus.indices) { + val cpu = vm.cpus[i] + + // Limit each vCPU to at most an equal share of the host CPU + val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size) + + // Compute the fraction of compute time allocated to the VM + val fraction = actualUsage / usage[i] + + // Compute the burst time that the VM was actually granted + val grantedBurst = max(0, burst[i] - ceil(remainder[i] * fraction).toLong()) + + // Compute remaining burst time to be executed for the request + vm.requestedBurst[i] = max(0, vm.requestedBurst[i] - grantedBurst) + } + + if (vm.requestedBurst.any { it == 0L } || vm.requestedDeadline <= end) { + // Return vCPU `run` call: the requested burst was completed or deadline was exceeded + vm.chan.send(Unit) + } + } + } + + this.call = call + call.invokeOnCompletion { this.call = null } + } + + /** + * Flush the progress of the current active VMs. + */ + private fun flush() { + val call = call ?: return // If there is no active call, there is nothing to flush + // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its + // completion. + call.cancel() + } + internal inner class VmServerContext( override var server: Server, val monitor: ServerMonitor, ctx: SimulationContext ) : ServerManagementContext { + val requestedBurst: LongArray = LongArray(server.flavor.cpuCount) + val requestedUsage: DoubleArray = DoubleArray(server.flavor.cpuCount) + var requestedDeadline: Long = 0L + var chan = Channel(Channel.RENDEZVOUS) private var initialized: Boolean = false internal val job: Job = ctx.domain.launch { @@ -101,7 +210,7 @@ class HypervisorVirtDriver( } } - override val cpus: List = scheduler.createVirtualCpus(server.flavor) + override val cpus: List = hostContext.cpus.take(server.flavor.cpuCount) override suspend fun init() { if (initialized) { @@ -124,5 +233,25 @@ class HypervisorVirtDriver( vms.remove(this) monitors.forEach { it.onUpdate(vms.size, availableMemory) } } + + override suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray { + for (i in cpus.indices) { + requestedBurst[i] = if (i < burst.size) burst[i] else 0 + requestedUsage[i] = if (i < maxUsage.size) maxUsage[i] else 0.0 + } + requestedDeadline = deadline + + // Wait until the burst has been run or the coroutine is cancelled + try { + activeVms += this + reschedule() + chan.receive() + } catch (_: CancellationException) { + // On cancellation, we compute and return the remaining burst + } + activeVms -= this + reschedule() + return requestedBurst + } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt deleted file mode 100644 index 7b00d99c..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.virt.driver.hypervisor - -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.execution.ProcessorContext - -/** - * A scheduler that assigns virtual CPUs to virtual machines and maps them to physical CPUs. - */ -public interface VmScheduler { - /** - * Create the virtual CPUs for the specified [flavor]. - */ - fun createVirtualCpus(flavor: Flavor): List -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt deleted file mode 100644 index f232d695..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt +++ /dev/null @@ -1,199 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.virt.driver.hypervisor - -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.execution.ProcessorContext -import com.atlarge.opendc.compute.core.execution.ServerContext -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.Job -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min - -/** - * A basic implementation of the [VmScheduler] interface. - * - * @property hostContext The [ServerContext] of the host. - * @property hypervisorMonitor The [HypervisorMonitor] to inform with hypervisor scheduling events. - */ -public class VmSchedulerImpl( - private val hostContext: ServerContext, - private val hypervisorMonitor: HypervisorMonitor -) : VmScheduler { - /** - * The available physical CPUs to schedule. - */ - private val cpus = hostContext.cpus.map { HostProcessorContext(it, hostContext, hypervisorMonitor) } - - override fun createVirtualCpus(flavor: Flavor): List { - // TODO At the moment, the first N cores get filled the first. Distribute over all cores instead - require(flavor.cpuCount <= cpus.size) { "Flavor cannot fit on machine" } - - return cpus - .asSequence() - .take(flavor.cpuCount) - .sortedBy { it.vcpus.size } - .map { VirtualProcessorContext(it) } - .toList() - } - - /** - * A wrapper around a host [ProcessorContext] that carries additional information about the vCPUs scheduled on the - * processor. - */ - internal class HostProcessorContext( - delegate: ProcessorContext, - private val hostContext: ServerContext, - private val hypervisorMonitor: HypervisorMonitor - ) : ProcessorContext by delegate { - /** - * The set of vCPUs scheduled on this processor. - */ - var vcpus: MutableSet = mutableSetOf() - - /** - * The deferred run call. - */ - var call: Job? = null - - /** - * Schedule the vCPUs on the physical CPU. - */ - suspend fun reschedule() { - flush() - - val vcpus = HashSet(vcpus) // Create snapshot of the vCPUs that were scheduled at this moment - val call = simulationContext.domain.launch { - var duration: Long = Long.MAX_VALUE - var deadline: Long = Long.MAX_VALUE - - for (vcpu in vcpus) { - // Limit each vCPU to at most an equal share of the host CPU - vcpu.actualUsage = min(vcpu.requestedUsage, info.frequency / vcpus.size) - - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, ceil(vcpu.requestedBurst / (vcpu.actualUsage * 1_000_000L)).toLong()) - deadline = min(deadline, vcpu.requestedDeadline) - } - - var burst: Long = 0 - var usage: Double = 0.0 - - for (vcpu in vcpus) { - vcpu.actualBurst = (duration * vcpu.actualUsage * 1_000_000L).toLong() - burst += vcpu.actualBurst - usage += vcpu.actualUsage - } - - // Ignore time slice if no work to request - if (burst <= 0L) { - return@launch - } - - // We run the total burst on the host processor. Note that this call may be cancelled at any moment in - // time, so not all of the burst may be executed. - val remainder = run(burst, usage, deadline) - val time = simulationContext.clock.millis() - val totalGrantedBurst: Long = burst - remainder - - // Compute for each vCPU the - for (vcpu in vcpus) { - // Compute the fraction of compute time allocated to the VM - val fraction = vcpu.actualUsage / usage - // Compute the burst time that the VM was actually granted - val grantedBurst = max(0, vcpu.actualBurst - ceil(remainder * fraction).toLong()) - // Compute remaining burst time to be executed for the request - vcpu.requestedBurst = max(0, vcpu.requestedBurst - grantedBurst) - - if (vcpu.requestedBurst == 0L || vcpu.requestedDeadline <= time) { - // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vcpu.chan.send(Unit) - } - } - - hypervisorMonitor.onSliceFinish( - time, - burst, - totalGrantedBurst, - vcpus.size, - hostContext.server - ) - } - - this.call = call - call.invokeOnCompletion { this.call = null } - } - - /** - * Flush the progress of the current active VMs. - */ - fun flush() { - val call = call ?: return // If there is no active call, there is nothing to flush - // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its - // completion. - call.cancel() - } - } - - /** - * An implementation of [ProcessorContext] that delegates the work to a physical CPU. - */ - internal class VirtualProcessorContext(val host: HostProcessorContext) : ProcessorContext { - var actualBurst: Long = 0 - var actualUsage: Double = 0.0 - var requestedBurst: Long = 0 - var requestedUsage: Double = 0.0 - var requestedDeadline: Long = 0 - var chan = Channel(Channel.RENDEZVOUS) - - override val info: ProcessingUnit - get() = host.info - - override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long { - requestedBurst = burst - requestedUsage = maxUsage - requestedDeadline = deadline - - // Wait until the burst has been run or the coroutine is cancelled - try { - host.vcpus.add(this) - host.reschedule() - chan.receive() - } catch (_: CancellationException) { - // On cancellation, we compute and return the remaining burst - } - - host.vcpus.remove(this) - host.reschedule() - return requestedBurst - } - } -} diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index 6468a408..84b16b68 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server @@ -53,15 +54,16 @@ internal class SimpleBareMetalDriverTest { root.launch { val dom = root.newDomain(name = "driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) - val cpus = List(5) { ProcessingUnit(cpuNode, it, 2400.0) } + val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", cpus, emptyList(), dom) val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { + println("[${simulationContext.clock.millis()}] $server") finalState = server.state } } - val image = FlopsApplicationImage(UUID.randomUUID(), "", emptyMap(), 1000, 2) + val image = FlopsApplicationImage(UUID.randomUUID(), "", emptyMap(), 1_000_000_000, 2) // Batch driver commands withContext(dom.coroutineContext) { diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt index 7e5a4bbe..6cfb2317 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -69,8 +69,8 @@ internal class HypervisorTest { println("Hello World!") } }) - val workloadA = FlopsApplicationImage(UUID.randomUUID(), "", emptyMap(), 1_000_000, 1) - val workloadB = FlopsApplicationImage(UUID.randomUUID(), "", emptyMap(), 2_000_000, 1) + val workloadA = FlopsApplicationImage(UUID.randomUUID(), "", emptyMap(), 1_000_000_000, 1) + val workloadB = FlopsApplicationImage(UUID.randomUUID(), "", emptyMap(), 2_000_000_000, 1) val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { println("[${simulationContext.clock.millis()}]: $server") @@ -80,12 +80,13 @@ internal class HypervisorTest { val driverDom = root.newDomain("driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) - val cpus = List(5) { ProcessingUnit(cpuNode, it, 2000.0) } + val cpus = List(4) { ProcessingUnit(cpuNode, it, 2000.0) } val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", cpus, emptyList(), driverDom) metalDriver.init(monitor) metalDriver.setImage(vmm) metalDriver.setPower(PowerState.POWER_ON) + delay(5) val flavor = Flavor(1, 0) -- cgit v1.2.3 From c5f6d5b5b1f5f42317236afa963426b3d0767db0 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 29 Feb 2020 17:44:50 +0100 Subject: refactor: Store remainder in burst array This change modifies the API of ServerContext to store the remainder burst into the input burst array instead of returning a new array. --- .../opendc/compute/core/execution/ServerContext.kt | 9 ++++-- .../compute/core/image/FlopsApplicationImage.kt | 6 ++-- .../atlarge/opendc/compute/core/image/VmImage.kt | 2 ++ .../compute/metal/driver/SimpleBareMetalDriver.kt | 25 ++++++++-------- .../virt/driver/hypervisor/HypervisorVirtDriver.kt | 33 +++++++++++++--------- 5 files changed, 42 insertions(+), 33 deletions(-) (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index 3a804f51..485fdcdf 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -54,10 +54,15 @@ public interface ServerContext { * Request the specified burst time from the processor cores and suspend execution until a processor core finishes * processing a **non-zero** burst or until the deadline is reached. * + * After the method returns, [burst] will contain the remaining burst length for each of the cores (which may be + * zero). + * + * Both [burst] and [maxUsage] must be of the same size and in any other case the method will throw an + * [IllegalArgumentException]. + * * @param burst The burst time to request from each of the processor cores. * @param maxUsage The maximum usage in terms of MHz that the processing core may use while running the burst. * @param deadline The instant at which this request needs to be fulfilled. - * @return The remaining burst time of the processor cores. */ - public suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray + public suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long) } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt index d2d35db9..27d8091a 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt @@ -42,7 +42,7 @@ import kotlin.math.min * @property cores The number of cores that the image is able to utilize. * @property utilization A model of the CPU utilization of the application. */ -class FlopsApplicationImage( +data class FlopsApplicationImage( public override val uid: UUID, public override val name: String, public override val tags: TagContainer, @@ -61,7 +61,7 @@ class FlopsApplicationImage( */ override suspend fun invoke(ctx: ServerContext) { val cores = min(this.cores, ctx.server.flavor.cpuCount) - var burst = LongArray(cores) { flops / cores } + val burst = LongArray(cores) { flops / cores } val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization } while (coroutineContext.isActive) { @@ -69,7 +69,7 @@ class FlopsApplicationImage( break } - burst = ctx.run(burst, maxUsage, Long.MAX_VALUE) + ctx.run(burst, maxUsage, Long.MAX_VALUE) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 7c4fe839..52f9068d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -33,4 +33,6 @@ class VmImage( } } } + + override fun toString(): String = "VmImage(uid=$uid, name=$name, cores=$cores, requiredMemory=$requiredMemory)" } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index d86e04ff..92338ca1 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -167,15 +167,14 @@ public class SimpleBareMetalDriver( domain.launch { monitor.onUpdate(server, previousState) } } - override suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray { + override suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long) { + require(burst.size == maxUsage.size) { "Array dimensions do not match" } + val start = simulationContext.clock.millis() var duration = max(0, deadline - start) - for (i in 0..cpus.size) { - if (i >= burst.size || i >= maxUsage.size) { - continue - } - + // Determine the duration of the first CPU to finish + for (i in 0 until min(cpus.size, burst.size)) { val cpu = cpus[i] val usage = min(maxUsage[i], cpu.frequency) * 1_000_000 // Usage from MHz to Hz val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds @@ -192,14 +191,12 @@ public class SimpleBareMetalDriver( } val end = simulationContext.clock.millis() - return LongArray(cpus.size) { i -> - if (i < burst.size && i < maxUsage.size) { - val usage = min(maxUsage[i], cpus[i].frequency) * 1_000_000 - val granted = ceil((end - start) / 1000.0 * usage).toLong() - max(0, burst[i] - granted) - } else { - 0 - } + + // Write back the remaining burst time + for (i in 0 until min(cpus.size, burst.size)) { + val usage = min(maxUsage[i], cpus[i].frequency) * 1_000_000 + val granted = ceil((end - start) / 1000.0 * usage).toLong() + burst[i] = max(0, burst[i] - granted) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index 87c4f073..2f1328ab 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -105,6 +105,12 @@ class HypervisorVirtDriver( */ private suspend fun reschedule() { flush() + + // Do not schedule a call if there is no work to schedule + if (activeVms.isEmpty()) { + return + } + val call = simulationContext.domain.launch { val start = simulationContext.clock.millis() val vms = activeVms.toSet() @@ -114,7 +120,7 @@ class HypervisorVirtDriver( val usage = DoubleArray(hostContext.cpus.size) for (vm in vms) { - for (i in vm.cpus.indices) { + for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { val cpu = vm.cpus[i] // Limit each vCPU to at most an equal share of the host CPU @@ -130,7 +136,7 @@ class HypervisorVirtDriver( val burst = LongArray(hostContext.cpus.size) for (vm in vms) { - for (i in vm.cpus.indices) { + for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { val cpu = vm.cpus[i] // Limit each vCPU to at most an equal share of the host CPU @@ -143,7 +149,7 @@ class HypervisorVirtDriver( // We run the total burst on the host processor. Note that this call may be cancelled at any moment in // time, so not all of the burst may be executed. - val remainder = hostContext.run(burst, usage, deadline) + hostContext.run(burst, usage, deadline) val end = simulationContext.clock.millis() // No work was performed @@ -152,17 +158,18 @@ class HypervisorVirtDriver( } for (vm in vms) { - for (i in vm.cpus.indices) { + for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { val cpu = vm.cpus[i] // Limit each vCPU to at most an equal share of the host CPU val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size) + val actualBurst = (duration * actualUsage * 1_000_000L).toLong() // Compute the fraction of compute time allocated to the VM val fraction = actualUsage / usage[i] // Compute the burst time that the VM was actually granted - val grantedBurst = max(0, burst[i] - ceil(remainder[i] * fraction).toLong()) + val grantedBurst = max(0, actualBurst - ceil(burst[i] * fraction).toLong()) // Compute remaining burst time to be executed for the request vm.requestedBurst[i] = max(0, vm.requestedBurst[i] - grantedBurst) @@ -174,7 +181,6 @@ class HypervisorVirtDriver( } } } - this.call = call call.invokeOnCompletion { this.call = null } } @@ -194,8 +200,8 @@ class HypervisorVirtDriver( val monitor: ServerMonitor, ctx: SimulationContext ) : ServerManagementContext { - val requestedBurst: LongArray = LongArray(server.flavor.cpuCount) - val requestedUsage: DoubleArray = DoubleArray(server.flavor.cpuCount) + lateinit var requestedBurst: LongArray + lateinit var requestedUsage: DoubleArray var requestedDeadline: Long = 0L var chan = Channel(Channel.RENDEZVOUS) private var initialized: Boolean = false @@ -234,11 +240,11 @@ class HypervisorVirtDriver( monitors.forEach { it.onUpdate(vms.size, availableMemory) } } - override suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray { - for (i in cpus.indices) { - requestedBurst[i] = if (i < burst.size) burst[i] else 0 - requestedUsage[i] = if (i < maxUsage.size) maxUsage[i] else 0.0 - } + override suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long) { + require(burst.size == maxUsage.size) { "Array dimensions do not match" } + + requestedBurst = burst + requestedUsage = maxUsage requestedDeadline = deadline // Wait until the burst has been run or the coroutine is cancelled @@ -251,7 +257,6 @@ class HypervisorVirtDriver( } activeVms -= this reschedule() - return requestedBurst } } } -- cgit v1.2.3 From b201a8e1a8a3a9199c0f38265bd1326f18d22722 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 3 Mar 2020 22:50:23 +0100 Subject: feat: Re-add support for hypervisor monitor --- .../compute/virt/driver/hypervisor/HypervisorVirtDriver.kt | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index 2f1328ab..783a9138 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -147,9 +147,10 @@ class HypervisorVirtDriver( } } + val granted = burst.clone() // We run the total burst on the host processor. Note that this call may be cancelled at any moment in // time, so not all of the burst may be executed. - hostContext.run(burst, usage, deadline) + hostContext.run(granted, usage, deadline) val end = simulationContext.clock.millis() // No work was performed @@ -180,6 +181,16 @@ class HypervisorVirtDriver( vm.chan.send(Unit) } } + + for (i in burst.indices) { + monitor.onSliceFinish( + end, + burst[i], + granted[i], + vms.size, + hostContext.server + ) + } } this.call = call call.invokeOnCompletion { this.call = null } -- cgit v1.2.3 From 5f5d54b6f1a96bc595f99f367bea54f1d852ec63 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 3 Mar 2020 22:53:57 +0100 Subject: refactor: Rename maxUsage to limit This change renames the `maxUsage` parameter to `limit` in order to align terminology with other products/projects such as VMWare vSphere. --- .../opendc/compute/core/execution/ServerContext.kt | 6 +++--- .../compute/metal/driver/SimpleBareMetalDriver.kt | 8 ++++---- .../virt/driver/hypervisor/HypervisorVirtDriver.kt | 22 +++++++++++----------- 3 files changed, 18 insertions(+), 18 deletions(-) (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index 485fdcdf..b09a5a7d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -57,12 +57,12 @@ public interface ServerContext { * After the method returns, [burst] will contain the remaining burst length for each of the cores (which may be * zero). * - * Both [burst] and [maxUsage] must be of the same size and in any other case the method will throw an + * Both [burst] and [limit] must be of the same size and in any other case the method will throw an * [IllegalArgumentException]. * * @param burst The burst time to request from each of the processor cores. - * @param maxUsage The maximum usage in terms of MHz that the processing core may use while running the burst. + * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst. * @param deadline The instant at which this request needs to be fulfilled. */ - public suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long) + public suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 92338ca1..fcdc9363 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -167,8 +167,8 @@ public class SimpleBareMetalDriver( domain.launch { monitor.onUpdate(server, previousState) } } - override suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long) { - require(burst.size == maxUsage.size) { "Array dimensions do not match" } + override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { + require(burst.size == limit.size) { "Array dimensions do not match" } val start = simulationContext.clock.millis() var duration = max(0, deadline - start) @@ -176,7 +176,7 @@ public class SimpleBareMetalDriver( // Determine the duration of the first CPU to finish for (i in 0 until min(cpus.size, burst.size)) { val cpu = cpus[i] - val usage = min(maxUsage[i], cpu.frequency) * 1_000_000 // Usage from MHz to Hz + val usage = min(limit[i], cpu.frequency) * 1_000_000 // Usage from MHz to Hz val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst @@ -194,7 +194,7 @@ public class SimpleBareMetalDriver( // Write back the remaining burst time for (i in 0 until min(cpus.size, burst.size)) { - val usage = min(maxUsage[i], cpus[i].frequency) * 1_000_000 + val usage = min(limit[i], cpus[i].frequency) * 1_000_000 val granted = ceil((end - start) / 1000.0 * usage).toLong() burst[i] = max(0, burst[i] - granted) } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index 783a9138..3f358516 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -124,11 +124,11 @@ class HypervisorVirtDriver( val cpu = vm.cpus[i] // Limit each vCPU to at most an equal share of the host CPU - val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size) + val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) // The duration that we want to run is that of the shortest request from a vCPU duration = min(duration, ceil(vm.requestedBurst[i] / (actualUsage * 1_000_000L)).toLong()) - deadline = min(deadline, vm.requestedDeadline) + deadline = min(deadline, vm.deadline) usage[i] += actualUsage } } @@ -140,7 +140,7 @@ class HypervisorVirtDriver( val cpu = vm.cpus[i] // Limit each vCPU to at most an equal share of the host CPU - val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size) + val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) val actualBurst = (duration * actualUsage * 1_000_000L).toLong() burst[i] += actualBurst @@ -163,7 +163,7 @@ class HypervisorVirtDriver( val cpu = vm.cpus[i] // Limit each vCPU to at most an equal share of the host CPU - val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size) + val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) val actualBurst = (duration * actualUsage * 1_000_000L).toLong() // Compute the fraction of compute time allocated to the VM @@ -176,7 +176,7 @@ class HypervisorVirtDriver( vm.requestedBurst[i] = max(0, vm.requestedBurst[i] - grantedBurst) } - if (vm.requestedBurst.any { it == 0L } || vm.requestedDeadline <= end) { + if (vm.requestedBurst.any { it == 0L } || vm.deadline <= end) { // Return vCPU `run` call: the requested burst was completed or deadline was exceeded vm.chan.send(Unit) } @@ -212,8 +212,8 @@ class HypervisorVirtDriver( ctx: SimulationContext ) : ServerManagementContext { lateinit var requestedBurst: LongArray - lateinit var requestedUsage: DoubleArray - var requestedDeadline: Long = 0L + lateinit var limit: DoubleArray + var deadline: Long = 0L var chan = Channel(Channel.RENDEZVOUS) private var initialized: Boolean = false @@ -251,12 +251,12 @@ class HypervisorVirtDriver( monitors.forEach { it.onUpdate(vms.size, availableMemory) } } - override suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long) { - require(burst.size == maxUsage.size) { "Array dimensions do not match" } + override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { + require(burst.size == limit.size) { "Array dimensions do not match" } requestedBurst = burst - requestedUsage = maxUsage - requestedDeadline = deadline + this.limit = limit + this.deadline = deadline // Wait until the burst has been run or the coroutine is cancelled try { -- cgit v1.2.3