From e85a11645a2262e2e6fd1e3570ad001eb805c85f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 2 Mar 2021 21:15:38 +0100 Subject: compute: Separate cloud compute layer from bare-metal layer This change separates the cloud compute layer in OpenDC (e.g., Server) from the bare-metal layer (e.g., Node), such that Node and BareMetalDriver are unaware of the existence of Server and co. --- .../kotlin/org/opendc/compute/core/metal/Node.kt | 10 +-- .../opendc/compute/core/virt/HypervisorEvent.kt | 4 +- .../simulator/ComputeSimExecutionContext.kt | 36 --------- .../org/opendc/compute/simulator/HypervisorView.kt | 4 +- .../opendc/compute/simulator/SimBareMetalDriver.kt | 94 +++------------------- .../org/opendc/compute/simulator/SimVirtDriver.kt | 51 ++++-------- .../simulator/SimVirtProvisioningService.kt | 94 ++++++++-------------- .../AvailableCoreMemoryAllocationPolicy.kt | 2 +- .../allocation/ComparableAllocationPolicyLogic.kt | 4 +- .../allocation/ProvisionedCoresAllocationPolicy.kt | 2 +- .../simulator/allocation/RandomAllocationPolicy.kt | 2 +- .../simulator/allocation/ReplayAllocationPolicy.kt | 2 +- .../compute/simulator/SimBareMetalDriverTest.kt | 21 ++--- .../simulator/SimProvisioningServiceTest.kt | 2 +- .../experiments/capelin/ExperimentHelpers.kt | 14 ++-- .../capelin/monitor/ExperimentMonitor.kt | 7 +- .../capelin/monitor/ParquetExperimentMonitor.kt | 43 +++++----- .../experiments/capelin/telemetry/HostEvent.kt | 4 +- .../telemetry/parquet/ParquetHostEventWriter.kt | 4 +- .../experiments/capelin/CapelinIntegrationTest.kt | 10 +-- .../org/opendc/runner/web/WebExperimentMonitor.kt | 57 ++++++------- .../simulator/compute/SimBareMetalMachine.kt | 38 ++++----- .../simulator/compute/SimExecutionContext.kt | 5 ++ .../simulator/compute/SimFairShareHypervisor.kt | 49 ++++++----- .../org/opendc/simulator/compute/SimMachine.kt | 2 +- .../simulator/compute/SimSpaceSharedHypervisor.kt | 40 ++++----- .../main/kotlin/org/opendc/utils/flow/EventFlow.kt | 28 +++++-- 27 files changed, 251 insertions(+), 378 deletions(-) delete mode 100644 simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt (limited to 'simulator') diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt index 6d9506f1..480bc224 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt @@ -23,7 +23,7 @@ package org.opendc.compute.core.metal import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.Server +import org.opendc.compute.core.Flavor import org.opendc.compute.core.image.Image import org.opendc.core.Identity import java.util.UUID @@ -53,14 +53,14 @@ public data class Node( public val state: NodeState, /** - * The boot image of the node. + * The flavor of the node. */ - public val image: Image, + public val flavor: Flavor, /** - * The server instance that is running on the node or `null` if no server is running. + * The boot image of the node. */ - public val server: Server?, + public val image: Image, /** * The events that are emitted by the node. diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt index 9fb437de..d1c8d790 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt @@ -22,7 +22,7 @@ package org.opendc.compute.core.virt -import org.opendc.compute.core.Server +import org.opendc.compute.core.metal.Node import org.opendc.compute.core.virt.driver.VirtDriver /** @@ -71,6 +71,6 @@ public sealed class HypervisorEvent { public val cpuUsage: Double, public val cpuDemand: Double, public val numberOfDeployedImages: Int, - public val hostServer: Server + public val host: Node ) : HypervisorEvent() } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt deleted file mode 100644 index 153a86b3..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import org.opendc.compute.core.Server -import org.opendc.simulator.compute.SimExecutionContext - -/** - * Extended [SimExecutionContext] in which workloads within the OpenDC Compute module run. - */ -public interface ComputeSimExecutionContext : SimExecutionContext { - /** - * The server on which the image runs. - */ - public val server: Server -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt index 1a79523e..cf2747cd 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt @@ -22,13 +22,13 @@ package org.opendc.compute.simulator -import org.opendc.compute.core.Server +import org.opendc.compute.core.metal.Node import org.opendc.compute.core.virt.driver.VirtDriver import java.util.UUID public class HypervisorView( public val uid: UUID, - public var server: Server, + public var node: Node, public var numberOfActiveServers: Int, public var availableMemory: Long, public var provisionedCores: Int diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt index 8af45616..a27c331d 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt @@ -25,9 +25,6 @@ package org.opendc.compute.simulator import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState import org.opendc.compute.core.image.Image import org.opendc.compute.core.metal.Node import org.opendc.compute.core.metal.NodeEvent @@ -36,18 +33,14 @@ import org.opendc.compute.core.metal.driver.BareMetalDriver import org.opendc.compute.simulator.power.api.CpuPowerModel import org.opendc.compute.simulator.power.api.Powerable import org.opendc.compute.simulator.power.models.ConstantPowerModel -import org.opendc.core.services.ServiceRegistry import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.SimExecutionContext import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.workload.SimResourceCommand import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.FailureDomain import org.opendc.utils.flow.EventFlow import org.opendc.utils.flow.StateFlow import java.time.Clock import java.util.UUID -import kotlin.random.Random /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. @@ -87,7 +80,7 @@ public class SimBareMetalDriver( * The machine state. */ private val nodeState = - StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, Image.EMPTY, null, events)) + StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, flavor, Image.EMPTY, events)) /** * The [SimBareMetalMachine] we use to run the workload. @@ -101,21 +94,11 @@ public class SimBareMetalDriver( override val powerDraw: Flow = cpuPowerModel.getPowerDraw(this) - /** - * The internal random instance. - */ - private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits) - /** * The [Job] that runs the simulated workload. */ private var job: Job? = null - /** - * The event stream to publish to for the server. - */ - private var serverEvents: EventFlow? = null - override suspend fun init(): Node { return nodeState.value } @@ -126,51 +109,13 @@ public class SimBareMetalDriver( return node } - val events = EventFlow() - serverEvents = events - val server = Server( - UUID(random.nextLong(), random.nextLong()), - node.name, - emptyMap(), - flavor, - node.image, - ServerState.BUILD, - ServiceRegistry().put(BareMetalDriver, this@SimBareMetalDriver), - events - ) - - val delegate = node.image.tags["workload"] as SimWorkload - // Wrap the workload to pass in a ComputeSimExecutionContext - val workload = object : SimWorkload { - lateinit var wrappedCtx: ComputeSimExecutionContext - - override fun onStart(ctx: SimExecutionContext) { - wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx { - override val server: Server - get() = nodeState.value.server!! - - override fun toString(): String = "WrappedSimExecutionContext" - } - - delegate.onStart(wrappedCtx) - } - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return delegate.onStart(wrappedCtx, cpu) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return delegate.onNext(wrappedCtx, cpu, remainingWork) - } - - override fun toString(): String = "SimWorkloadWrapper(delegate=$delegate)" - } + val workload = node.image.tags["workload"] as SimWorkload job = coroutineScope.launch { delay(1) // TODO Introduce boot time initMachine() try { - machine.run(workload) + machine.run(workload, mapOf("driver" to this@SimBareMetalDriver, "node" to node)) exitMachine(null) } catch (_: CancellationException) { // Ignored @@ -179,31 +124,21 @@ public class SimBareMetalDriver( } } - setNode(node.copy(state = NodeState.BOOT, server = server)) + setNode(node.copy(state = NodeState.BOOT)) return nodeState.value } private fun initMachine() { - val server = nodeState.value.server?.copy(state = ServerState.ACTIVE) - setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server)) + setNode(nodeState.value.copy(state = NodeState.ACTIVE)) } private fun exitMachine(cause: Throwable?) { - val newServerState = - if (cause == null) - ServerState.SHUTOFF - else - ServerState.ERROR val newNodeState = if (cause == null) - nodeState.value.state + NodeState.SHUTOFF else NodeState.ERROR - val server = nodeState.value.server?.copy(state = newServerState) - setNode(nodeState.value.copy(state = newNodeState, server = server)) - - serverEvents?.close() - serverEvents = null + setNode(nodeState.value.copy(state = newNodeState)) } override suspend fun stop(): Node { @@ -213,7 +148,7 @@ public class SimBareMetalDriver( } job?.cancelAndJoin() - setNode(node.copy(state = NodeState.SHUTOFF, server = null)) + setNode(node.copy(state = NodeState.SHUTOFF)) return node } @@ -235,13 +170,6 @@ public class SimBareMetalDriver( events.emit(NodeEvent.StateChanged(value, field.state)) } - val oldServer = field.server - val newServer = value.server - - if (oldServer != null && newServer != null && oldServer.state != newServer.state) { - serverEvents?.emit(ServerEvent.StateChanged(newServer, oldServer.state)) - } - nodeState.value = value } @@ -249,13 +177,11 @@ public class SimBareMetalDriver( get() = coroutineScope override suspend fun fail() { - val server = nodeState.value.server?.copy(state = ServerState.ERROR) - setNode(nodeState.value.copy(state = NodeState.ERROR, server = server)) + setNode(nodeState.value.copy(state = NodeState.ERROR)) } override suspend fun recover() { - val server = nodeState.value.server?.copy(state = ServerState.ACTIVE) - setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server)) + setNode(nodeState.value.copy(state = NodeState.ACTIVE)) } override fun toString(): String = "SimBareMetalDriver(node = ${nodeState.value.uid})" diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt index 86a671fc..d28b2f0d 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt @@ -29,6 +29,7 @@ import kotlinx.coroutines.launch import org.opendc.compute.core.* import org.opendc.compute.core.Flavor import org.opendc.compute.core.image.Image +import org.opendc.compute.core.metal.Node import org.opendc.compute.core.virt.HypervisorEvent import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException import org.opendc.compute.core.virt.driver.VirtDriver @@ -45,17 +46,20 @@ import java.util.* /** * A [VirtDriver] that is simulates virtual machines on a physical machine using [SimHypervisor]. */ -public class SimVirtDriver(private val coroutineScope: CoroutineScope, hypervisor: SimHypervisorProvider) : VirtDriver, SimWorkload { +public class SimVirtDriver( + private val coroutineScope: CoroutineScope, + hypervisor: SimHypervisorProvider +) : VirtDriver, SimWorkload { /** * The execution context in which the [VirtDriver] runs. */ - private lateinit var ctx: ComputeSimExecutionContext + private lateinit var ctx: SimExecutionContext /** - * The server hosting this hypervisor. + * The node on which the hypervisor runs. */ - public val server: Server - get() = ctx.server + public val node: Node + get() = ctx.meta["node"] as Node /** * The [EventFlow] to emit the events. @@ -93,7 +97,7 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope, hyperviso cpuUsage, cpuDemand, vms.size, - ctx.server + node ) ) } @@ -153,13 +157,13 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope, hyperviso } private fun vmStarted(vm: VirtualMachine) { - vms.forEach { it -> + vms.forEach { vm.performanceInterferenceModel?.onStart(it.server.image.name) } } private fun vmStopped(vm: VirtualMachine) { - vms.forEach { it -> + vms.forEach { vm.performanceInterferenceModel?.onStop(it.server.image.name) } } @@ -171,37 +175,12 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope, hyperviso val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? val job = coroutineScope.launch { - val delegate = server.image.tags["workload"] as SimWorkload - // Wrap the workload to pass in a ComputeSimExecutionContext - val workload = object : SimWorkload { - lateinit var wrappedCtx: ComputeSimExecutionContext - - override fun onStart(ctx: SimExecutionContext) { - wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx { - override val server: Server - get() = server - - override fun toString(): String = "WrappedSimExecutionContext" - } - - delegate.onStart(wrappedCtx) - } - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return delegate.onStart(wrappedCtx, cpu) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return delegate.onNext(wrappedCtx, cpu, remainingWork) - } - - override fun toString(): String = "SimWorkloadWrapper(delegate=$delegate)" - } + val workload = server.image.tags["workload"] as SimWorkload delay(1) // TODO Introduce boot time init() try { - machine.run(workload) + machine.run(workload, mapOf("driver" to this@SimVirtDriver, "server" to server)) exit(null) } catch (cause: Throwable) { exit(cause) @@ -239,7 +218,7 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope, hyperviso } override fun onStart(ctx: SimExecutionContext) { - this.ctx = ctx as ComputeSimExecutionContext + this.ctx = ctx this.availableMemory = ctx.machine.memory.map { it.size }.sum() this.hypervisor.onStart(ctx) } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt index 50ab7788..18afd0c2 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt @@ -32,6 +32,9 @@ import org.opendc.compute.core.Server import org.opendc.compute.core.ServerEvent import org.opendc.compute.core.ServerState import org.opendc.compute.core.image.Image +import org.opendc.compute.core.metal.Node +import org.opendc.compute.core.metal.NodeEvent +import org.opendc.compute.core.metal.NodeState import org.opendc.compute.core.metal.service.ProvisioningService import org.opendc.compute.core.virt.HypervisorEvent import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException @@ -68,7 +71,7 @@ public class SimVirtProvisioningService( /** * The hypervisors that have been launched by the service. */ - private val hypervisors: MutableMap = mutableMapOf() + private val hypervisors: MutableMap = mutableMapOf() /** * The available hypervisors. @@ -118,22 +121,12 @@ public class SimVirtProvisioningService( val workload = SimVirtDriver(coroutineScope, hypervisor) val hypervisorImage = Image(UUID.randomUUID(), "vmm", mapOf("workload" to workload)) launch { - var init = false val deployedNode = provisioningService.deploy(node, hypervisorImage) - val server = deployedNode.server!! - server.events.onEach { event -> + deployedNode.events.onEach { event -> when (event) { - is ServerEvent.StateChanged -> { - if (!init) { - init = true - } - stateChanged(event.server) - } + is NodeEvent.StateChanged -> stateChanged(event.node, workload) } }.launchIn(this) - - delay(1) - onHypervisorAvailable(server, workload) } } } @@ -229,7 +222,7 @@ public class SimVirtProvisioningService( } try { - logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } + logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.node.uid} ${selectedHv.node.name} ${selectedHv.node.flavor}" } incomingImages.poll() // Speculatively update the hypervisor view information to prevent other images in the queue from @@ -308,28 +301,38 @@ public class SimVirtProvisioningService( } } - private fun stateChanged(server: Server) { - when (server.state) { - ServerState.ACTIVE -> { - logger.debug { "[${clock.millis()}] Server ${server.uid} available: ${server.state}" } + private fun stateChanged(node: Node, hypervisor: SimVirtDriver) { + when (node.state) { + NodeState.ACTIVE -> { + logger.debug { "[${clock.millis()}] Server ${node.uid} available: ${node.state}" } - if (server in hypervisors) { + if (node in hypervisors) { // Corner case for when the hypervisor already exists - availableHypervisors += hypervisors.getValue(server) + availableHypervisors += hypervisors.getValue(node) } else { val hv = HypervisorView( - server.uid, - server, + node.uid, + node, 0, - server.flavor.memorySize, + node.flavor.memorySize, 0 ) - maxCores = max(maxCores, server.flavor.cpuCount) - maxMemory = max(maxMemory, server.flavor.memorySize) - hypervisors[server] = hv + hv.driver = hypervisor + hv.driver.events + .onEach { event -> + if (event is HypervisorEvent.VmsUpdated) { + hv.numberOfActiveServers = event.numberOfActiveServers + hv.availableMemory = event.availableMemory + } + }.launchIn(coroutineScope) + + maxCores = max(maxCores, node.flavor.cpuCount) + maxMemory = max(maxMemory, node.flavor.memorySize) + hypervisors[node] = hv + availableHypervisors += hv } - tracer.commit(HypervisorAvailableEvent(server.uid)) + tracer.commit(HypervisorAvailableEvent(node.uid)) eventFlow.emit( VirtProvisioningEvent.MetricsAvailable( @@ -349,9 +352,9 @@ public class SimVirtProvisioningService( requestCycle() } } - ServerState.SHUTOFF, ServerState.ERROR -> { - logger.debug { "[${clock.millis()}] Server ${server.uid} unavailable: ${server.state}" } - val hv = hypervisors[server] ?: return + NodeState.SHUTOFF, NodeState.ERROR -> { + logger.debug { "[${clock.millis()}] Server ${node.uid} unavailable: ${node.state}" } + val hv = hypervisors[node] ?: return availableHypervisors -= hv tracer.commit(HypervisorUnavailableEvent(hv.uid)) @@ -377,37 +380,6 @@ public class SimVirtProvisioningService( } } - private fun onHypervisorAvailable(server: Server, hypervisor: SimVirtDriver) { - val hv = hypervisors[server] ?: return - hv.driver = hypervisor - availableHypervisors += hv - - tracer.commit(HypervisorAvailableEvent(hv.uid)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - ) - - hv.driver.events - .onEach { event -> - if (event is HypervisorEvent.VmsUpdated) { - hv.numberOfActiveServers = event.numberOfActiveServers - hv.availableMemory = event.availableMemory - } - }.launchIn(coroutineScope) - - requestCycle() - } - public data class ImageView( public val name: String, public val image: Image, diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt index 38a07b2b..5e044282 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt @@ -32,7 +32,7 @@ import org.opendc.compute.simulator.HypervisorView public class AvailableCoreMemoryAllocationPolicy(private val reversed: Boolean = false) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic { override val comparator: Comparator = - compareBy { -it.availableMemory / it.server.flavor.cpuCount } + compareBy { -it.availableMemory / it.node.flavor.cpuCount } .run { if (reversed) reversed() else this } } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt index 4470eab9..04a181a6 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt @@ -41,9 +41,9 @@ public interface ComparableAllocationPolicyLogic : AllocationPolicy.Logic { return hypervisors.asSequence() .filter { hv -> val fitsMemory = hv.availableMemory >= (image.flavor.memorySize) - val fitsCpu = hv.server.flavor.cpuCount >= image.flavor.cpuCount + val fitsCpu = hv.node.flavor.cpuCount >= image.flavor.cpuCount fitsMemory && fitsCpu } - .minWithOrNull(comparator.thenBy { it.server.uid }) + .minWithOrNull(comparator.thenBy { it.node.uid }) } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt index 4344d979..91441ecd 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt @@ -34,7 +34,7 @@ import org.opendc.compute.simulator.HypervisorView public class ProvisionedCoresAllocationPolicy(private val reversed: Boolean = false) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic { override val comparator: Comparator = - compareBy { it.provisionedCores / it.server.flavor.cpuCount } + compareBy { it.provisionedCores / it.node.flavor.cpuCount } .run { if (reversed) reversed() else this } } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt index ac34f410..9a89fccd 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt @@ -39,7 +39,7 @@ public class RandomAllocationPolicy(private val random: Random = Random(0)) : Al return hypervisors.asIterable() .filter { hv -> val fitsMemory = hv.availableMemory >= (image.image.tags["required-memory"] as Long) - val fitsCpu = hv.server.flavor.cpuCount >= image.flavor.cpuCount + val fitsCpu = hv.node.flavor.cpuCount >= image.flavor.cpuCount fitsMemory && fitsCpu } .randomOrNull(random) diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt index 5312f4da..582fe817 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt @@ -42,7 +42,7 @@ public class ReplayAllocationPolicy(private val vmPlacements: Map + val node = driver.start() + node.events.collect { event -> when (event) { - is ServerEvent.StateChanged -> { - finalState = event.server.state + is NodeEvent.StateChanged -> { + finalState = event.node.state finalTime = clock.millis() } } @@ -83,7 +84,9 @@ internal class SimBareMetalDriverTest { } testScope.advanceUntilIdle() - assertEquals(ServerState.SHUTOFF, finalState) - assertEquals(501, finalTime) + assertAll( + { assertEquals(NodeState.SHUTOFF, finalState) }, + { assertEquals(501, finalTime) } + ) } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt index dad31298..eb46c335 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt @@ -73,7 +73,7 @@ internal class SimProvisioningServiceTest { delay(5) val nodes = provisioner.nodes() val node = provisioner.deploy(nodes.first(), image) - node.server!!.events.collect { println(it) } + node.events.collect { println(it) } } testScope.advanceUntilIdle() diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 8f3e686a..b941d135 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -35,7 +35,7 @@ import mu.KotlinLogging import org.opendc.compute.core.Flavor import org.opendc.compute.core.ServerEvent import org.opendc.compute.core.metal.NODE_CLUSTER -import org.opendc.compute.core.metal.driver.BareMetalDriver +import org.opendc.compute.core.metal.NodeEvent import org.opendc.compute.core.metal.service.ProvisioningService import org.opendc.compute.core.virt.HypervisorEvent import org.opendc.compute.core.virt.service.VirtProvisioningEvent @@ -175,14 +175,14 @@ public suspend fun attachMonitor( // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - val server = (hypervisor as SimVirtDriver).server + val server = (hypervisor as SimVirtDriver).node monitor.reportHostStateChange(clock.millis(), hypervisor, server) server.events .onEach { event -> val time = clock.millis() when (event) { - is ServerEvent.StateChanged -> { - monitor.reportHostStateChange(time, hypervisor, event.server) + is NodeEvent.StateChanged -> { + monitor.reportHostStateChange(time, hypervisor, event.node) } } } @@ -199,15 +199,15 @@ public suspend fun attachMonitor( event.cpuUsage, event.cpuDemand, event.numberOfDeployedImages, - event.hostServer + event.host ) } } .launchIn(coroutineScope) - val driver = hypervisor.server.services[BareMetalDriver.Key] as SimBareMetalDriver + val driver = server.metadata["driver"] as SimBareMetalDriver driver.powerDraw - .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } + .onEach { monitor.reportPowerConsumption(server, it) } .launchIn(coroutineScope) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 3c6637bf..04ffd148 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -23,6 +23,7 @@ package org.opendc.experiments.capelin.monitor import org.opendc.compute.core.Server +import org.opendc.compute.core.metal.Node import org.opendc.compute.core.virt.driver.VirtDriver import org.opendc.compute.core.virt.service.VirtProvisioningEvent import java.io.Closeable @@ -42,14 +43,14 @@ public interface ExperimentMonitor : Closeable { public fun reportHostStateChange( time: Long, driver: VirtDriver, - server: Server + host: Node ) { } /** * Report the power consumption of a host. */ - public fun reportPowerConsumption(host: Server, draw: Double) {} + public fun reportPowerConsumption(host: Node, draw: Double) {} /** * This method is invoked for a host for each slice that is finishes. @@ -63,7 +64,7 @@ public interface ExperimentMonitor : Closeable { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long = 5 * 60 * 1000L ) { } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index a0d57656..e8aa5915 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -24,6 +24,7 @@ package org.opendc.experiments.capelin.monitor import mu.KotlinLogging import org.opendc.compute.core.Server +import org.opendc.compute.core.metal.Node import org.opendc.compute.core.virt.driver.VirtDriver import org.opendc.compute.core.virt.service.VirtProvisioningEvent import org.opendc.experiments.capelin.telemetry.HostEvent @@ -49,7 +50,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: File(base, "provisioner-metrics/$partition/data.parquet"), bufferSize ) - private val currentHostEvent = mutableMapOf() + private val currentHostEvent = mutableMapOf() private var startTime = -1L override fun reportVmStateChange(time: Long, server: Server) { @@ -64,11 +65,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: override fun reportHostStateChange( time: Long, driver: VirtDriver, - server: Server + host: Node ) { - logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } + logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } - val previousEvent = currentHostEvent[server] + val previousEvent = currentHostEvent[host] val roundedTime = previousEvent?.let { val duration = time - it.timestamp @@ -91,13 +92,13 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: 0.0, 0.0, 0, - server + host ) } - private val lastPowerConsumption = mutableMapOf() + private val lastPowerConsumption = mutableMapOf() - override fun reportPowerConsumption(host: Server, draw: Double) { + override fun reportPowerConsumption(host: Node, draw: Double) { lastPowerConsumption[host] = draw } @@ -110,16 +111,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long ) { - val previousEvent = currentHostEvent[hostServer] + val previousEvent = currentHostEvent[host] when { previousEvent == null -> { val event = HostEvent( time, 5 * 60 * 1000L, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -127,17 +128,17 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } previousEvent.timestamp == time -> { val event = HostEvent( time, previousEvent.duration, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -145,11 +146,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } else -> { hostWriter.write(previousEvent) @@ -157,7 +158,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: val event = HostEvent( time, time - previousEvent.timestamp, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -165,11 +166,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt index e5e9d520..0d5fce09 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.capelin.telemetry -import org.opendc.compute.core.Server +import org.opendc.compute.core.metal.Node /** * A periodic report of the host machine metrics. @@ -30,7 +30,7 @@ import org.opendc.compute.core.Server public data class HostEvent( override val timestamp: Long, public val duration: Long, - public val host: Server, + public val node: Node, public val vmCount: Int, public val requestedBurst: Long, public val grantedBurst: Long, diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt index 4a3e7963..b4fdd66a 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt @@ -41,8 +41,8 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) : // record.put("portfolio_id", event.run.parent.parent.id) // record.put("scenario_id", event.run.parent.id) // record.put("run_id", event.run.id) - record.put("host_id", event.host.name) - record.put("state", event.host.state.name) + record.put("host_id", event.node.name) + record.put("state", event.node.state.name) record.put("timestamp", event.timestamp) record.put("duration", event.duration) record.put("vm_count", event.vmCount) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 6a0796f6..0d6c057f 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -32,7 +32,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.core.Server +import org.opendc.compute.core.metal.Node import org.opendc.compute.core.workload.VmWorkload import org.opendc.compute.simulator.SimVirtProvisioningService import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy @@ -148,9 +148,9 @@ class CapelinIntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1684849230562, monitor.totalRequestedBurst) }, - { assertEquals(447612683996, monitor.totalGrantedBurst) }, - { assertEquals(1219535757406, monitor.totalOvercommissionedBurst) }, + { assertEquals(1678587333640, monitor.totalRequestedBurst) }, + { assertEquals(438118200924, monitor.totalGrantedBurst) }, + { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) } @@ -242,7 +242,7 @@ class CapelinIntegrationTest { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long ) { totalRequestedBurst += requestedBurst diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt index f16f9b90..b7a26d34 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -24,7 +24,8 @@ package org.opendc.runner.web import mu.KotlinLogging import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerState +import org.opendc.compute.core.metal.Node +import org.opendc.compute.core.metal.NodeState import org.opendc.compute.core.virt.driver.VirtDriver import org.opendc.compute.core.virt.service.VirtProvisioningEvent import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -36,7 +37,7 @@ import kotlin.math.max */ public class WebExperimentMonitor : ExperimentMonitor { private val logger = KotlinLogging.logger {} - private val currentHostEvent = mutableMapOf() + private val currentHostEvent = mutableMapOf() private var startTime = -1L override fun reportVmStateChange(time: Long, server: Server) { @@ -51,11 +52,11 @@ public class WebExperimentMonitor : ExperimentMonitor { override fun reportHostStateChange( time: Long, driver: VirtDriver, - server: Server + host: Node ) { - logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } + logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } - val previousEvent = currentHostEvent[server] + val previousEvent = currentHostEvent[host] val roundedTime = previousEvent?.let { val duration = time - it.timestamp @@ -78,13 +79,13 @@ public class WebExperimentMonitor : ExperimentMonitor { 0.0, 0.0, 0, - server + host ) } - private val lastPowerConsumption = mutableMapOf() + private val lastPowerConsumption = mutableMapOf() - override fun reportPowerConsumption(host: Server, draw: Double) { + override fun reportPowerConsumption(host: Node, draw: Double) { lastPowerConsumption[host] = draw } @@ -97,16 +98,16 @@ public class WebExperimentMonitor : ExperimentMonitor { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long ) { - val previousEvent = currentHostEvent[hostServer] + val previousEvent = currentHostEvent[host] when { previousEvent == null -> { val event = HostEvent( time, 5 * 60 * 1000L, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -114,17 +115,17 @@ public class WebExperimentMonitor : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } previousEvent.timestamp == time -> { val event = HostEvent( time, previousEvent.duration, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -132,11 +133,11 @@ public class WebExperimentMonitor : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } else -> { processHostEvent(previousEvent) @@ -144,7 +145,7 @@ public class WebExperimentMonitor : ExperimentMonitor { val event = HostEvent( time, time - previousEvent.timestamp, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -152,17 +153,17 @@ public class WebExperimentMonitor : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } } } private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() - private val hostMetrics: MutableMap = mutableMapOf() + private val hostMetrics: MutableMap = mutableMapOf() private fun processHostEvent(event: HostEvent) { val slices = event.duration / SLICE_LENGTH @@ -173,14 +174,14 @@ public class WebExperimentMonitor : ExperimentMonitor { hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst, hostAggregateMetrics.totalInterferedBurst + event.interferedBurst, hostAggregateMetrics.totalPowerDraw + (slices * (event.powerDraw / 12)), - hostAggregateMetrics.totalFailureSlices + if (event.host.state != ServerState.ACTIVE) slices.toLong() else 0, - hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != ServerState.ACTIVE) event.vmCount * slices.toLong() else 0 + hostAggregateMetrics.totalFailureSlices + if (event.node.state != NodeState.ACTIVE) slices.toLong() else 0, + hostAggregateMetrics.totalFailureVmSlices + if (event.node.state != NodeState.ACTIVE) event.vmCount * slices.toLong() else 0 ) - hostMetrics.compute(event.host) { key, prev -> + hostMetrics.compute(event.node) { _, prev -> HostMetrics( - (event.cpuUsage.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0), - (event.cpuDemand.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + (event.cpuUsage.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0), + (event.cpuDemand.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0), event.vmCount + (prev?.vmCount ?: 0), 1 + (prev?.count ?: 0) ) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 812b5f20..f74c5697 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -83,34 +83,34 @@ public class SimBareMetalMachine( */ private val scheduler = TimerScheduler(coroutineScope, clock) - /** - * The execution context in which the workload runs. - */ - private val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = this@SimBareMetalMachine.model - - override val clock: Clock - get() = this@SimBareMetalMachine.clock - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload) { + override suspend fun run(workload: SimWorkload, meta: Map) { require(!isTerminated) { "Machine is terminated" } require(cont == null) { "Run should not be called concurrently" } + val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = this@SimBareMetalMachine.model + + override val clock: Clock + get() = this@SimBareMetalMachine.clock + + override val meta: Map + get() = meta + + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } + workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.map { Cpu(it, workload) } + this.cpus = model.cpus.map { Cpu(ctx, it, workload) } for (cpu in cpus) { cpu.start() @@ -161,7 +161,7 @@ public class SimBareMetalMachine( /** * A physical CPU of the machine. */ - private inner class Cpu(val model: ProcessingUnit, val workload: SimWorkload) { + private inner class Cpu(val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload) { /** * The current command. */ diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt index c7c3d3cc..657dac66 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt @@ -40,6 +40,11 @@ public interface SimExecutionContext { */ public val machine: SimMachineModel + /** + * The metadata associated with the context. + */ + public val meta: Map + /** * Ask the host machine to interrupt the specified vCPU. * diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index 5e86d32b..bf6d8a5e 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -335,34 +335,34 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener */ private var cpus: List = emptyList() - /** - * The execution context in which the workload runs. - */ - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = model - - override val clock: Clock - get() = this@SimFairShareHypervisor.ctx.clock - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload) { + override suspend fun run(workload: SimWorkload, meta: Map) { require(!isTerminated) { "Machine is terminated" } require(cont == null) { "Run should not be called concurrently" } + val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = model + + override val clock: Clock + get() = this@SimFairShareHypervisor.ctx.clock + + override val meta: Map + get() = meta + + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } + workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.map { VCpu(this, it, workload) } + this.cpus = model.cpus.map { VCpu(this, ctx, it, workload) } for (cpu in cpus) { // Register vCPU to scheduler @@ -417,7 +417,12 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener /** * A CPU of the virtual machine. */ - private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload) : Comparable { + private inner class VCpu( + val vm: SimVm, + val ctx: SimExecutionContext, + val model: ProcessingUnit, + val workload: SimWorkload + ) : Comparable { /** * The latest command processed by the CPU. */ @@ -488,7 +493,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener isIntermediate = true latestFlush = ctx.clock.millis() - process(workload.onStart(vm.ctx, model.id)) + process(workload.onStart(ctx, model.id)) } catch (e: Throwable) { fail(e) } finally { @@ -514,7 +519,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener // Act like nothing has happened in case the vCPU did not reach its deadline or was not // interrupted by the user. if (interrupt || command.deadline <= now) { - process(workload.onNext(vm.ctx, model.id, 0.0)) + process(workload.onNext(ctx, model.id, 0.0)) } } is SimResourceCommand.Consume -> { @@ -545,7 +550,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener totalOvercommittedWork += remainingWork } - process(workload.onNext(vm.ctx, model.id, remainingWork)) + process(workload.onNext(ctx, model.id, remainingWork)) } else { process(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline)) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt index ea8eeb37..bfaa60bc 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -42,7 +42,7 @@ public interface SimMachine : AutoCloseable { /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - public suspend fun run(workload: SimWorkload) + public suspend fun run(workload: SimWorkload, meta: Map = emptyMap()) /** * Terminate this machine. diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index 66d3eda7..778b68ca 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -116,34 +116,34 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen */ private var cpus: List = emptyList() - /** - * The execution context in which the workload runs. - */ - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = model - - override val clock: Clock - get() = this@SimSpaceSharedHypervisor.ctx.clock - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload) { + override suspend fun run(workload: SimWorkload, meta: Map) { require(!isTerminated) { "Machine is terminated" } require(cont == null) { "Run should not be called concurrently" } + val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = model + + override val clock: Clock + get() = this@SimSpaceSharedHypervisor.ctx.clock + + override val meta: Map + get() = meta + + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } + workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, model, workload, pCPUs[index]) } + this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, ctx, model, workload, pCPUs[index]) } for (cpu in cpus) { cpu.start() @@ -193,7 +193,7 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen /** * A CPU of the virtual machine. */ - private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) { + private inner class VCpu(val vm: SimVm, val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) { /** * The processing speed of the vCPU. */ @@ -267,7 +267,7 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen * Interrupt the CPU. */ fun interrupt() { - ctx.interrupt(pCPU) + this@SimSpaceSharedHypervisor.ctx.interrupt(pCPU) } /** diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt index 948595b1..10f29f4e 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt @@ -58,12 +58,22 @@ public fun EventFlow(): EventFlow = EventFlowImpl() @OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) private class EventFlowImpl : EventFlow { private var closed: Boolean = false - private val subscribers = HashMap, Unit>() + private val subscribers = mutableListOf>() override fun emit(event: T) { + if (closed) { + return + } + + val it = subscribers.iterator() synchronized(this) { - for ((chan, _) in subscribers) { - chan.offer(event) + while (it.hasNext()) { + val chan = it.next() + if (chan.isClosedForSend) { + it.remove() + } else { + chan.offer(event) + } } } } @@ -72,9 +82,11 @@ private class EventFlowImpl : EventFlow { synchronized(this) { closed = true - for ((chan, _) in subscribers) { + for (chan in subscribers) { chan.close() } + + subscribers.clear() } } @@ -87,9 +99,13 @@ private class EventFlowImpl : EventFlow { } channel = Channel(Channel.UNLIMITED) - subscribers[channel] = Unit + subscribers.add(channel) + } + try { + channel.consumeAsFlow().collect(collector) + } finally { + channel.close() } - channel.consumeAsFlow().collect(collector) } override fun toString(): String = "EventFlow" -- cgit v1.2.3