summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt10
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt36
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt94
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt51
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt94
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt2
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt2
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt2
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt2
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt21
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt14
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt7
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt43
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt10
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt57
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt38
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt5
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt49
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt40
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt28
27 files changed, 251 insertions, 378 deletions
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt
index 6d9506f1..480bc224 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt
@@ -23,7 +23,7 @@
package org.opendc.compute.core.metal
import kotlinx.coroutines.flow.Flow
-import org.opendc.compute.core.Server
+import org.opendc.compute.core.Flavor
import org.opendc.compute.core.image.Image
import org.opendc.core.Identity
import java.util.UUID
@@ -53,14 +53,14 @@ public data class Node(
public val state: NodeState,
/**
- * The boot image of the node.
+ * The flavor of the node.
*/
- public val image: Image,
+ public val flavor: Flavor,
/**
- * The server instance that is running on the node or `null` if no server is running.
+ * The boot image of the node.
*/
- public val server: Server?,
+ public val image: Image,
/**
* The events that are emitted by the node.
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt
index 9fb437de..d1c8d790 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt
@@ -22,7 +22,7 @@
package org.opendc.compute.core.virt
-import org.opendc.compute.core.Server
+import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.virt.driver.VirtDriver
/**
@@ -71,6 +71,6 @@ public sealed class HypervisorEvent {
public val cpuUsage: Double,
public val cpuDemand: Double,
public val numberOfDeployedImages: Int,
- public val hostServer: Server
+ public val host: Node
) : HypervisorEvent()
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt
deleted file mode 100644
index 153a86b3..00000000
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-import org.opendc.compute.core.Server
-import org.opendc.simulator.compute.SimExecutionContext
-
-/**
- * Extended [SimExecutionContext] in which workloads within the OpenDC Compute module run.
- */
-public interface ComputeSimExecutionContext : SimExecutionContext {
- /**
- * The server on which the image runs.
- */
- public val server: Server
-}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt
index 1a79523e..cf2747cd 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt
@@ -22,13 +22,13 @@
package org.opendc.compute.simulator
-import org.opendc.compute.core.Server
+import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.virt.driver.VirtDriver
import java.util.UUID
public class HypervisorView(
public val uid: UUID,
- public var server: Server,
+ public var node: Node,
public var numberOfActiveServers: Int,
public var availableMemory: Long,
public var provisionedCores: Int
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt
index 8af45616..a27c331d 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt
@@ -25,9 +25,6 @@ package org.opendc.compute.simulator
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.ServerEvent
-import org.opendc.compute.core.ServerState
import org.opendc.compute.core.image.Image
import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.metal.NodeEvent
@@ -36,18 +33,14 @@ import org.opendc.compute.core.metal.driver.BareMetalDriver
import org.opendc.compute.simulator.power.api.CpuPowerModel
import org.opendc.compute.simulator.power.api.Powerable
import org.opendc.compute.simulator.power.models.ConstantPowerModel
-import org.opendc.core.services.ServiceRegistry
import org.opendc.simulator.compute.SimBareMetalMachine
-import org.opendc.simulator.compute.SimExecutionContext
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.workload.SimResourceCommand
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.FailureDomain
import org.opendc.utils.flow.EventFlow
import org.opendc.utils.flow.StateFlow
import java.time.Clock
import java.util.UUID
-import kotlin.random.Random
/**
* A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
@@ -87,7 +80,7 @@ public class SimBareMetalDriver(
* The machine state.
*/
private val nodeState =
- StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, Image.EMPTY, null, events))
+ StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, flavor, Image.EMPTY, events))
/**
* The [SimBareMetalMachine] we use to run the workload.
@@ -102,20 +95,10 @@ public class SimBareMetalDriver(
override val powerDraw: Flow<Double> = cpuPowerModel.getPowerDraw(this)
/**
- * The internal random instance.
- */
- private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits)
-
- /**
* The [Job] that runs the simulated workload.
*/
private var job: Job? = null
- /**
- * The event stream to publish to for the server.
- */
- private var serverEvents: EventFlow<ServerEvent>? = null
-
override suspend fun init(): Node {
return nodeState.value
}
@@ -126,51 +109,13 @@ public class SimBareMetalDriver(
return node
}
- val events = EventFlow<ServerEvent>()
- serverEvents = events
- val server = Server(
- UUID(random.nextLong(), random.nextLong()),
- node.name,
- emptyMap(),
- flavor,
- node.image,
- ServerState.BUILD,
- ServiceRegistry().put(BareMetalDriver, this@SimBareMetalDriver),
- events
- )
-
- val delegate = node.image.tags["workload"] as SimWorkload
- // Wrap the workload to pass in a ComputeSimExecutionContext
- val workload = object : SimWorkload {
- lateinit var wrappedCtx: ComputeSimExecutionContext
-
- override fun onStart(ctx: SimExecutionContext) {
- wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx {
- override val server: Server
- get() = nodeState.value.server!!
-
- override fun toString(): String = "WrappedSimExecutionContext"
- }
-
- delegate.onStart(wrappedCtx)
- }
-
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return delegate.onStart(wrappedCtx, cpu)
- }
-
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- return delegate.onNext(wrappedCtx, cpu, remainingWork)
- }
-
- override fun toString(): String = "SimWorkloadWrapper(delegate=$delegate)"
- }
+ val workload = node.image.tags["workload"] as SimWorkload
job = coroutineScope.launch {
delay(1) // TODO Introduce boot time
initMachine()
try {
- machine.run(workload)
+ machine.run(workload, mapOf("driver" to this@SimBareMetalDriver, "node" to node))
exitMachine(null)
} catch (_: CancellationException) {
// Ignored
@@ -179,31 +124,21 @@ public class SimBareMetalDriver(
}
}
- setNode(node.copy(state = NodeState.BOOT, server = server))
+ setNode(node.copy(state = NodeState.BOOT))
return nodeState.value
}
private fun initMachine() {
- val server = nodeState.value.server?.copy(state = ServerState.ACTIVE)
- setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server))
+ setNode(nodeState.value.copy(state = NodeState.ACTIVE))
}
private fun exitMachine(cause: Throwable?) {
- val newServerState =
- if (cause == null)
- ServerState.SHUTOFF
- else
- ServerState.ERROR
val newNodeState =
if (cause == null)
- nodeState.value.state
+ NodeState.SHUTOFF
else
NodeState.ERROR
- val server = nodeState.value.server?.copy(state = newServerState)
- setNode(nodeState.value.copy(state = newNodeState, server = server))
-
- serverEvents?.close()
- serverEvents = null
+ setNode(nodeState.value.copy(state = newNodeState))
}
override suspend fun stop(): Node {
@@ -213,7 +148,7 @@ public class SimBareMetalDriver(
}
job?.cancelAndJoin()
- setNode(node.copy(state = NodeState.SHUTOFF, server = null))
+ setNode(node.copy(state = NodeState.SHUTOFF))
return node
}
@@ -235,13 +170,6 @@ public class SimBareMetalDriver(
events.emit(NodeEvent.StateChanged(value, field.state))
}
- val oldServer = field.server
- val newServer = value.server
-
- if (oldServer != null && newServer != null && oldServer.state != newServer.state) {
- serverEvents?.emit(ServerEvent.StateChanged(newServer, oldServer.state))
- }
-
nodeState.value = value
}
@@ -249,13 +177,11 @@ public class SimBareMetalDriver(
get() = coroutineScope
override suspend fun fail() {
- val server = nodeState.value.server?.copy(state = ServerState.ERROR)
- setNode(nodeState.value.copy(state = NodeState.ERROR, server = server))
+ setNode(nodeState.value.copy(state = NodeState.ERROR))
}
override suspend fun recover() {
- val server = nodeState.value.server?.copy(state = ServerState.ACTIVE)
- setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server))
+ setNode(nodeState.value.copy(state = NodeState.ACTIVE))
}
override fun toString(): String = "SimBareMetalDriver(node = ${nodeState.value.uid})"
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
index 86a671fc..d28b2f0d 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
@@ -29,6 +29,7 @@ import kotlinx.coroutines.launch
import org.opendc.compute.core.*
import org.opendc.compute.core.Flavor
import org.opendc.compute.core.image.Image
+import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.virt.HypervisorEvent
import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException
import org.opendc.compute.core.virt.driver.VirtDriver
@@ -45,17 +46,20 @@ import java.util.*
/**
* A [VirtDriver] that is simulates virtual machines on a physical machine using [SimHypervisor].
*/
-public class SimVirtDriver(private val coroutineScope: CoroutineScope, hypervisor: SimHypervisorProvider) : VirtDriver, SimWorkload {
+public class SimVirtDriver(
+ private val coroutineScope: CoroutineScope,
+ hypervisor: SimHypervisorProvider
+) : VirtDriver, SimWorkload {
/**
* The execution context in which the [VirtDriver] runs.
*/
- private lateinit var ctx: ComputeSimExecutionContext
+ private lateinit var ctx: SimExecutionContext
/**
- * The server hosting this hypervisor.
+ * The node on which the hypervisor runs.
*/
- public val server: Server
- get() = ctx.server
+ public val node: Node
+ get() = ctx.meta["node"] as Node
/**
* The [EventFlow] to emit the events.
@@ -93,7 +97,7 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope, hyperviso
cpuUsage,
cpuDemand,
vms.size,
- ctx.server
+ node
)
)
}
@@ -153,13 +157,13 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope, hyperviso
}
private fun vmStarted(vm: VirtualMachine) {
- vms.forEach { it ->
+ vms.forEach {
vm.performanceInterferenceModel?.onStart(it.server.image.name)
}
}
private fun vmStopped(vm: VirtualMachine) {
- vms.forEach { it ->
+ vms.forEach {
vm.performanceInterferenceModel?.onStop(it.server.image.name)
}
}
@@ -171,37 +175,12 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope, hyperviso
val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
val job = coroutineScope.launch {
- val delegate = server.image.tags["workload"] as SimWorkload
- // Wrap the workload to pass in a ComputeSimExecutionContext
- val workload = object : SimWorkload {
- lateinit var wrappedCtx: ComputeSimExecutionContext
-
- override fun onStart(ctx: SimExecutionContext) {
- wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx {
- override val server: Server
- get() = server
-
- override fun toString(): String = "WrappedSimExecutionContext"
- }
-
- delegate.onStart(wrappedCtx)
- }
-
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return delegate.onStart(wrappedCtx, cpu)
- }
-
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- return delegate.onNext(wrappedCtx, cpu, remainingWork)
- }
-
- override fun toString(): String = "SimWorkloadWrapper(delegate=$delegate)"
- }
+ val workload = server.image.tags["workload"] as SimWorkload
delay(1) // TODO Introduce boot time
init()
try {
- machine.run(workload)
+ machine.run(workload, mapOf("driver" to this@SimVirtDriver, "server" to server))
exit(null)
} catch (cause: Throwable) {
exit(cause)
@@ -239,7 +218,7 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope, hyperviso
}
override fun onStart(ctx: SimExecutionContext) {
- this.ctx = ctx as ComputeSimExecutionContext
+ this.ctx = ctx
this.availableMemory = ctx.machine.memory.map { it.size }.sum()
this.hypervisor.onStart(ctx)
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
index 50ab7788..18afd0c2 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
@@ -32,6 +32,9 @@ import org.opendc.compute.core.Server
import org.opendc.compute.core.ServerEvent
import org.opendc.compute.core.ServerState
import org.opendc.compute.core.image.Image
+import org.opendc.compute.core.metal.Node
+import org.opendc.compute.core.metal.NodeEvent
+import org.opendc.compute.core.metal.NodeState
import org.opendc.compute.core.metal.service.ProvisioningService
import org.opendc.compute.core.virt.HypervisorEvent
import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException
@@ -68,7 +71,7 @@ public class SimVirtProvisioningService(
/**
* The hypervisors that have been launched by the service.
*/
- private val hypervisors: MutableMap<Server, HypervisorView> = mutableMapOf()
+ private val hypervisors: MutableMap<Node, HypervisorView> = mutableMapOf()
/**
* The available hypervisors.
@@ -118,22 +121,12 @@ public class SimVirtProvisioningService(
val workload = SimVirtDriver(coroutineScope, hypervisor)
val hypervisorImage = Image(UUID.randomUUID(), "vmm", mapOf("workload" to workload))
launch {
- var init = false
val deployedNode = provisioningService.deploy(node, hypervisorImage)
- val server = deployedNode.server!!
- server.events.onEach { event ->
+ deployedNode.events.onEach { event ->
when (event) {
- is ServerEvent.StateChanged -> {
- if (!init) {
- init = true
- }
- stateChanged(event.server)
- }
+ is NodeEvent.StateChanged -> stateChanged(event.node, workload)
}
}.launchIn(this)
-
- delay(1)
- onHypervisorAvailable(server, workload)
}
}
}
@@ -229,7 +222,7 @@ public class SimVirtProvisioningService(
}
try {
- logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" }
+ logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.node.uid} ${selectedHv.node.name} ${selectedHv.node.flavor}" }
incomingImages.poll()
// Speculatively update the hypervisor view information to prevent other images in the queue from
@@ -308,28 +301,38 @@ public class SimVirtProvisioningService(
}
}
- private fun stateChanged(server: Server) {
- when (server.state) {
- ServerState.ACTIVE -> {
- logger.debug { "[${clock.millis()}] Server ${server.uid} available: ${server.state}" }
+ private fun stateChanged(node: Node, hypervisor: SimVirtDriver) {
+ when (node.state) {
+ NodeState.ACTIVE -> {
+ logger.debug { "[${clock.millis()}] Server ${node.uid} available: ${node.state}" }
- if (server in hypervisors) {
+ if (node in hypervisors) {
// Corner case for when the hypervisor already exists
- availableHypervisors += hypervisors.getValue(server)
+ availableHypervisors += hypervisors.getValue(node)
} else {
val hv = HypervisorView(
- server.uid,
- server,
+ node.uid,
+ node,
0,
- server.flavor.memorySize,
+ node.flavor.memorySize,
0
)
- maxCores = max(maxCores, server.flavor.cpuCount)
- maxMemory = max(maxMemory, server.flavor.memorySize)
- hypervisors[server] = hv
+ hv.driver = hypervisor
+ hv.driver.events
+ .onEach { event ->
+ if (event is HypervisorEvent.VmsUpdated) {
+ hv.numberOfActiveServers = event.numberOfActiveServers
+ hv.availableMemory = event.availableMemory
+ }
+ }.launchIn(coroutineScope)
+
+ maxCores = max(maxCores, node.flavor.cpuCount)
+ maxMemory = max(maxMemory, node.flavor.memorySize)
+ hypervisors[node] = hv
+ availableHypervisors += hv
}
- tracer.commit(HypervisorAvailableEvent(server.uid))
+ tracer.commit(HypervisorAvailableEvent(node.uid))
eventFlow.emit(
VirtProvisioningEvent.MetricsAvailable(
@@ -349,9 +352,9 @@ public class SimVirtProvisioningService(
requestCycle()
}
}
- ServerState.SHUTOFF, ServerState.ERROR -> {
- logger.debug { "[${clock.millis()}] Server ${server.uid} unavailable: ${server.state}" }
- val hv = hypervisors[server] ?: return
+ NodeState.SHUTOFF, NodeState.ERROR -> {
+ logger.debug { "[${clock.millis()}] Server ${node.uid} unavailable: ${node.state}" }
+ val hv = hypervisors[node] ?: return
availableHypervisors -= hv
tracer.commit(HypervisorUnavailableEvent(hv.uid))
@@ -377,37 +380,6 @@ public class SimVirtProvisioningService(
}
}
- private fun onHypervisorAvailable(server: Server, hypervisor: SimVirtDriver) {
- val hv = hypervisors[server] ?: return
- hv.driver = hypervisor
- availableHypervisors += hv
-
- tracer.commit(HypervisorAvailableEvent(hv.uid))
-
- eventFlow.emit(
- VirtProvisioningEvent.MetricsAvailable(
- this@SimVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
-
- hv.driver.events
- .onEach { event ->
- if (event is HypervisorEvent.VmsUpdated) {
- hv.numberOfActiveServers = event.numberOfActiveServers
- hv.availableMemory = event.availableMemory
- }
- }.launchIn(coroutineScope)
-
- requestCycle()
- }
-
public data class ImageView(
public val name: String,
public val image: Image,
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt
index 38a07b2b..5e044282 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt
@@ -32,7 +32,7 @@ import org.opendc.compute.simulator.HypervisorView
public class AvailableCoreMemoryAllocationPolicy(private val reversed: Boolean = false) : AllocationPolicy {
override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic {
override val comparator: Comparator<HypervisorView> =
- compareBy<HypervisorView> { -it.availableMemory / it.server.flavor.cpuCount }
+ compareBy<HypervisorView> { -it.availableMemory / it.node.flavor.cpuCount }
.run { if (reversed) reversed() else this }
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt
index 4470eab9..04a181a6 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt
@@ -41,9 +41,9 @@ public interface ComparableAllocationPolicyLogic : AllocationPolicy.Logic {
return hypervisors.asSequence()
.filter { hv ->
val fitsMemory = hv.availableMemory >= (image.flavor.memorySize)
- val fitsCpu = hv.server.flavor.cpuCount >= image.flavor.cpuCount
+ val fitsCpu = hv.node.flavor.cpuCount >= image.flavor.cpuCount
fitsMemory && fitsCpu
}
- .minWithOrNull(comparator.thenBy { it.server.uid })
+ .minWithOrNull(comparator.thenBy { it.node.uid })
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt
index 4344d979..91441ecd 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt
@@ -34,7 +34,7 @@ import org.opendc.compute.simulator.HypervisorView
public class ProvisionedCoresAllocationPolicy(private val reversed: Boolean = false) : AllocationPolicy {
override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic {
override val comparator: Comparator<HypervisorView> =
- compareBy<HypervisorView> { it.provisionedCores / it.server.flavor.cpuCount }
+ compareBy<HypervisorView> { it.provisionedCores / it.node.flavor.cpuCount }
.run { if (reversed) reversed() else this }
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt
index ac34f410..9a89fccd 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt
@@ -39,7 +39,7 @@ public class RandomAllocationPolicy(private val random: Random = Random(0)) : Al
return hypervisors.asIterable()
.filter { hv ->
val fitsMemory = hv.availableMemory >= (image.image.tags["required-memory"] as Long)
- val fitsCpu = hv.server.flavor.cpuCount >= image.flavor.cpuCount
+ val fitsCpu = hv.node.flavor.cpuCount >= image.flavor.cpuCount
fitsMemory && fitsCpu
}
.randomOrNull(random)
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt
index 5312f4da..582fe817 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt
@@ -42,7 +42,7 @@ public class ReplayAllocationPolicy(private val vmPlacements: Map<String, String
): HypervisorView? {
val clusterName = vmPlacements[image.name]
?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${image.name}")
- val machinesInCluster = hypervisors.filter { it.server.name.contains(clusterName) }
+ val machinesInCluster = hypervisors.filter { it.node.name.contains(clusterName) }
if (machinesInCluster.isEmpty()) {
logger.info { "Could not find any machines belonging to cluster $clusterName for image ${image.name}, assigning randomly." }
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt
index 03981feb..3ca9a0a3 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt
@@ -30,9 +30,10 @@ import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
-import org.opendc.compute.core.ServerEvent
-import org.opendc.compute.core.ServerState
+import org.junit.jupiter.api.assertAll
import org.opendc.compute.core.image.Image
+import org.opendc.compute.core.metal.NodeEvent
+import org.opendc.compute.core.metal.NodeState
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -60,7 +61,7 @@ internal class SimBareMetalDriverTest {
val testScope = TestCoroutineScope()
val clock = DelayControllerClockAdapter(testScope)
- var finalState: ServerState = ServerState.BUILD
+ var finalState: NodeState = NodeState.UNKNOWN
var finalTime = 0L
testScope.launch {
@@ -70,11 +71,11 @@ internal class SimBareMetalDriverTest {
withContext(coroutineContext) {
driver.init()
driver.setImage(image)
- val server = driver.start().server!!
- server.events.collect { event ->
+ val node = driver.start()
+ node.events.collect { event ->
when (event) {
- is ServerEvent.StateChanged -> {
- finalState = event.server.state
+ is NodeEvent.StateChanged -> {
+ finalState = event.node.state
finalTime = clock.millis()
}
}
@@ -83,7 +84,9 @@ internal class SimBareMetalDriverTest {
}
testScope.advanceUntilIdle()
- assertEquals(ServerState.SHUTOFF, finalState)
- assertEquals(501, finalTime)
+ assertAll(
+ { assertEquals(NodeState.SHUTOFF, finalState) },
+ { assertEquals(501, finalTime) }
+ )
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt
index dad31298..eb46c335 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt
@@ -73,7 +73,7 @@ internal class SimProvisioningServiceTest {
delay(5)
val nodes = provisioner.nodes()
val node = provisioner.deploy(nodes.first(), image)
- node.server!!.events.collect { println(it) }
+ node.events.collect { println(it) }
}
testScope.advanceUntilIdle()
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 8f3e686a..b941d135 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -35,7 +35,7 @@ import mu.KotlinLogging
import org.opendc.compute.core.Flavor
import org.opendc.compute.core.ServerEvent
import org.opendc.compute.core.metal.NODE_CLUSTER
-import org.opendc.compute.core.metal.driver.BareMetalDriver
+import org.opendc.compute.core.metal.NodeEvent
import org.opendc.compute.core.metal.service.ProvisioningService
import org.opendc.compute.core.virt.HypervisorEvent
import org.opendc.compute.core.virt.service.VirtProvisioningEvent
@@ -175,14 +175,14 @@ public suspend fun attachMonitor(
// Monitor hypervisor events
for (hypervisor in hypervisors) {
// TODO Do not expose VirtDriver directly but use Hypervisor class.
- val server = (hypervisor as SimVirtDriver).server
+ val server = (hypervisor as SimVirtDriver).node
monitor.reportHostStateChange(clock.millis(), hypervisor, server)
server.events
.onEach { event ->
val time = clock.millis()
when (event) {
- is ServerEvent.StateChanged -> {
- monitor.reportHostStateChange(time, hypervisor, event.server)
+ is NodeEvent.StateChanged -> {
+ monitor.reportHostStateChange(time, hypervisor, event.node)
}
}
}
@@ -199,15 +199,15 @@ public suspend fun attachMonitor(
event.cpuUsage,
event.cpuDemand,
event.numberOfDeployedImages,
- event.hostServer
+ event.host
)
}
}
.launchIn(coroutineScope)
- val driver = hypervisor.server.services[BareMetalDriver.Key] as SimBareMetalDriver
+ val driver = server.metadata["driver"] as SimBareMetalDriver
driver.powerDraw
- .onEach { monitor.reportPowerConsumption(hypervisor.server, it) }
+ .onEach { monitor.reportPowerConsumption(server, it) }
.launchIn(coroutineScope)
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 3c6637bf..04ffd148 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -23,6 +23,7 @@
package org.opendc.experiments.capelin.monitor
import org.opendc.compute.core.Server
+import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.virt.driver.VirtDriver
import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import java.io.Closeable
@@ -42,14 +43,14 @@ public interface ExperimentMonitor : Closeable {
public fun reportHostStateChange(
time: Long,
driver: VirtDriver,
- server: Server
+ host: Node
) {
}
/**
* Report the power consumption of a host.
*/
- public fun reportPowerConsumption(host: Server, draw: Double) {}
+ public fun reportPowerConsumption(host: Node, draw: Double) {}
/**
* This method is invoked for a host for each slice that is finishes.
@@ -63,7 +64,7 @@ public interface ExperimentMonitor : Closeable {
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- hostServer: Server,
+ host: Node,
duration: Long = 5 * 60 * 1000L
) {
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index a0d57656..e8aa5915 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -24,6 +24,7 @@ package org.opendc.experiments.capelin.monitor
import mu.KotlinLogging
import org.opendc.compute.core.Server
+import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.virt.driver.VirtDriver
import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import org.opendc.experiments.capelin.telemetry.HostEvent
@@ -49,7 +50,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
File(base, "provisioner-metrics/$partition/data.parquet"),
bufferSize
)
- private val currentHostEvent = mutableMapOf<Server, HostEvent>()
+ private val currentHostEvent = mutableMapOf<Node, HostEvent>()
private var startTime = -1L
override fun reportVmStateChange(time: Long, server: Server) {
@@ -64,11 +65,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
override fun reportHostStateChange(
time: Long,
driver: VirtDriver,
- server: Server
+ host: Node
) {
- logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" }
+ logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" }
- val previousEvent = currentHostEvent[server]
+ val previousEvent = currentHostEvent[host]
val roundedTime = previousEvent?.let {
val duration = time - it.timestamp
@@ -91,13 +92,13 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
0.0,
0.0,
0,
- server
+ host
)
}
- private val lastPowerConsumption = mutableMapOf<Server, Double>()
+ private val lastPowerConsumption = mutableMapOf<Node, Double>()
- override fun reportPowerConsumption(host: Server, draw: Double) {
+ override fun reportPowerConsumption(host: Node, draw: Double) {
lastPowerConsumption[host] = draw
}
@@ -110,16 +111,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- hostServer: Server,
+ host: Node,
duration: Long
) {
- val previousEvent = currentHostEvent[hostServer]
+ val previousEvent = currentHostEvent[host]
when {
previousEvent == null -> {
val event = HostEvent(
time,
5 * 60 * 1000L,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -127,17 +128,17 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
previousEvent.timestamp == time -> {
val event = HostEvent(
time,
previousEvent.duration,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -145,11 +146,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
else -> {
hostWriter.write(previousEvent)
@@ -157,7 +158,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
val event = HostEvent(
time,
time - previousEvent.timestamp,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -165,11 +166,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
index e5e9d520..0d5fce09 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
@@ -22,7 +22,7 @@
package org.opendc.experiments.capelin.telemetry
-import org.opendc.compute.core.Server
+import org.opendc.compute.core.metal.Node
/**
* A periodic report of the host machine metrics.
@@ -30,7 +30,7 @@ import org.opendc.compute.core.Server
public data class HostEvent(
override val timestamp: Long,
public val duration: Long,
- public val host: Server,
+ public val node: Node,
public val vmCount: Int,
public val requestedBurst: Long,
public val grantedBurst: Long,
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
index 4a3e7963..b4fdd66a 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
@@ -41,8 +41,8 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) :
// record.put("portfolio_id", event.run.parent.parent.id)
// record.put("scenario_id", event.run.parent.id)
// record.put("run_id", event.run.id)
- record.put("host_id", event.host.name)
- record.put("state", event.host.state.name)
+ record.put("host_id", event.node.name)
+ record.put("state", event.node.state.name)
record.put("timestamp", event.timestamp)
record.put("duration", event.duration)
record.put("vm_count", event.vmCount)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 6a0796f6..0d6c057f 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -32,7 +32,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.core.Server
+import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.workload.VmWorkload
import org.opendc.compute.simulator.SimVirtProvisioningService
import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy
@@ -148,9 +148,9 @@ class CapelinIntegrationTest {
assertAll(
{ assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
{ assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
- { assertEquals(1684849230562, monitor.totalRequestedBurst) },
- { assertEquals(447612683996, monitor.totalGrantedBurst) },
- { assertEquals(1219535757406, monitor.totalOvercommissionedBurst) },
+ { assertEquals(1678587333640, monitor.totalRequestedBurst) },
+ { assertEquals(438118200924, monitor.totalGrantedBurst) },
+ { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) },
{ assertEquals(0, monitor.totalInterferedBurst) }
)
}
@@ -242,7 +242,7 @@ class CapelinIntegrationTest {
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- hostServer: Server,
+ host: Node,
duration: Long
) {
totalRequestedBurst += requestedBurst
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
index f16f9b90..b7a26d34 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
@@ -24,7 +24,8 @@ package org.opendc.runner.web
import mu.KotlinLogging
import org.opendc.compute.core.Server
-import org.opendc.compute.core.ServerState
+import org.opendc.compute.core.metal.Node
+import org.opendc.compute.core.metal.NodeState
import org.opendc.compute.core.virt.driver.VirtDriver
import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -36,7 +37,7 @@ import kotlin.math.max
*/
public class WebExperimentMonitor : ExperimentMonitor {
private val logger = KotlinLogging.logger {}
- private val currentHostEvent = mutableMapOf<Server, HostEvent>()
+ private val currentHostEvent = mutableMapOf<Node, HostEvent>()
private var startTime = -1L
override fun reportVmStateChange(time: Long, server: Server) {
@@ -51,11 +52,11 @@ public class WebExperimentMonitor : ExperimentMonitor {
override fun reportHostStateChange(
time: Long,
driver: VirtDriver,
- server: Server
+ host: Node
) {
- logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" }
+ logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" }
- val previousEvent = currentHostEvent[server]
+ val previousEvent = currentHostEvent[host]
val roundedTime = previousEvent?.let {
val duration = time - it.timestamp
@@ -78,13 +79,13 @@ public class WebExperimentMonitor : ExperimentMonitor {
0.0,
0.0,
0,
- server
+ host
)
}
- private val lastPowerConsumption = mutableMapOf<Server, Double>()
+ private val lastPowerConsumption = mutableMapOf<Node, Double>()
- override fun reportPowerConsumption(host: Server, draw: Double) {
+ override fun reportPowerConsumption(host: Node, draw: Double) {
lastPowerConsumption[host] = draw
}
@@ -97,16 +98,16 @@ public class WebExperimentMonitor : ExperimentMonitor {
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- hostServer: Server,
+ host: Node,
duration: Long
) {
- val previousEvent = currentHostEvent[hostServer]
+ val previousEvent = currentHostEvent[host]
when {
previousEvent == null -> {
val event = HostEvent(
time,
5 * 60 * 1000L,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -114,17 +115,17 @@ public class WebExperimentMonitor : ExperimentMonitor {
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
previousEvent.timestamp == time -> {
val event = HostEvent(
time,
previousEvent.duration,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -132,11 +133,11 @@ public class WebExperimentMonitor : ExperimentMonitor {
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
else -> {
processHostEvent(previousEvent)
@@ -144,7 +145,7 @@ public class WebExperimentMonitor : ExperimentMonitor {
val event = HostEvent(
time,
time - previousEvent.timestamp,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -152,17 +153,17 @@ public class WebExperimentMonitor : ExperimentMonitor {
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
}
}
private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
- private val hostMetrics: MutableMap<Server, HostMetrics> = mutableMapOf()
+ private val hostMetrics: MutableMap<Node, HostMetrics> = mutableMapOf()
private fun processHostEvent(event: HostEvent) {
val slices = event.duration / SLICE_LENGTH
@@ -173,14 +174,14 @@ public class WebExperimentMonitor : ExperimentMonitor {
hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst,
hostAggregateMetrics.totalInterferedBurst + event.interferedBurst,
hostAggregateMetrics.totalPowerDraw + (slices * (event.powerDraw / 12)),
- hostAggregateMetrics.totalFailureSlices + if (event.host.state != ServerState.ACTIVE) slices.toLong() else 0,
- hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != ServerState.ACTIVE) event.vmCount * slices.toLong() else 0
+ hostAggregateMetrics.totalFailureSlices + if (event.node.state != NodeState.ACTIVE) slices.toLong() else 0,
+ hostAggregateMetrics.totalFailureVmSlices + if (event.node.state != NodeState.ACTIVE) event.vmCount * slices.toLong() else 0
)
- hostMetrics.compute(event.host) { key, prev ->
+ hostMetrics.compute(event.node) { _, prev ->
HostMetrics(
- (event.cpuUsage.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
- (event.cpuDemand.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
+ (event.cpuUsage.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
+ (event.cpuDemand.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
event.vmCount + (prev?.vmCount ?: 0),
1 + (prev?.count ?: 0)
)
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 812b5f20..f74c5697 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -84,33 +84,33 @@ public class SimBareMetalMachine(
private val scheduler = TimerScheduler<Cpu>(coroutineScope, clock)
/**
- * The execution context in which the workload runs.
- */
- private val ctx = object : SimExecutionContext {
- override val machine: SimMachineModel
- get() = this@SimBareMetalMachine.model
-
- override val clock: Clock
- get() = this@SimBareMetalMachine.clock
-
- override fun interrupt(cpu: Int) {
- require(cpu < cpus.size) { "Invalid CPU identifier" }
- cpus[cpu].interrupt()
- }
- }
-
- /**
* Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
- override suspend fun run(workload: SimWorkload) {
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
require(!isTerminated) { "Machine is terminated" }
require(cont == null) { "Run should not be called concurrently" }
+ val ctx = object : SimExecutionContext {
+ override val machine: SimMachineModel
+ get() = this@SimBareMetalMachine.model
+
+ override val clock: Clock
+ get() = this@SimBareMetalMachine.clock
+
+ override val meta: Map<String, Any>
+ get() = meta
+
+ override fun interrupt(cpu: Int) {
+ require(cpu < cpus.size) { "Invalid CPU identifier" }
+ cpus[cpu].interrupt()
+ }
+ }
+
workload.onStart(ctx)
return suspendCancellableCoroutine { cont ->
this.cont = cont
- this.cpus = model.cpus.map { Cpu(it, workload) }
+ this.cpus = model.cpus.map { Cpu(ctx, it, workload) }
for (cpu in cpus) {
cpu.start()
@@ -161,7 +161,7 @@ public class SimBareMetalMachine(
/**
* A physical CPU of the machine.
*/
- private inner class Cpu(val model: ProcessingUnit, val workload: SimWorkload) {
+ private inner class Cpu(val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload) {
/**
* The current command.
*/
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt
index c7c3d3cc..657dac66 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt
@@ -41,6 +41,11 @@ public interface SimExecutionContext {
public val machine: SimMachineModel
/**
+ * The metadata associated with the context.
+ */
+ public val meta: Map<String, Any>
+
+ /**
* Ask the host machine to interrupt the specified vCPU.
*
* @param cpu The id of the vCPU to interrupt.
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index 5e86d32b..bf6d8a5e 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -336,33 +336,33 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
private var cpus: List<VCpu> = emptyList()
/**
- * The execution context in which the workload runs.
- */
- val ctx = object : SimExecutionContext {
- override val machine: SimMachineModel
- get() = model
-
- override val clock: Clock
- get() = this@SimFairShareHypervisor.ctx.clock
-
- override fun interrupt(cpu: Int) {
- require(cpu < cpus.size) { "Invalid CPU identifier" }
- cpus[cpu].interrupt()
- }
- }
-
- /**
* Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
- override suspend fun run(workload: SimWorkload) {
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
require(!isTerminated) { "Machine is terminated" }
require(cont == null) { "Run should not be called concurrently" }
+ val ctx = object : SimExecutionContext {
+ override val machine: SimMachineModel
+ get() = model
+
+ override val clock: Clock
+ get() = this@SimFairShareHypervisor.ctx.clock
+
+ override val meta: Map<String, Any>
+ get() = meta
+
+ override fun interrupt(cpu: Int) {
+ require(cpu < cpus.size) { "Invalid CPU identifier" }
+ cpus[cpu].interrupt()
+ }
+ }
+
workload.onStart(ctx)
return suspendCancellableCoroutine { cont ->
this.cont = cont
- this.cpus = model.cpus.map { VCpu(this, it, workload) }
+ this.cpus = model.cpus.map { VCpu(this, ctx, it, workload) }
for (cpu in cpus) {
// Register vCPU to scheduler
@@ -417,7 +417,12 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
/**
* A CPU of the virtual machine.
*/
- private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload) : Comparable<VCpu> {
+ private inner class VCpu(
+ val vm: SimVm,
+ val ctx: SimExecutionContext,
+ val model: ProcessingUnit,
+ val workload: SimWorkload
+ ) : Comparable<VCpu> {
/**
* The latest command processed by the CPU.
*/
@@ -488,7 +493,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
isIntermediate = true
latestFlush = ctx.clock.millis()
- process(workload.onStart(vm.ctx, model.id))
+ process(workload.onStart(ctx, model.id))
} catch (e: Throwable) {
fail(e)
} finally {
@@ -514,7 +519,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
// Act like nothing has happened in case the vCPU did not reach its deadline or was not
// interrupted by the user.
if (interrupt || command.deadline <= now) {
- process(workload.onNext(vm.ctx, model.id, 0.0))
+ process(workload.onNext(ctx, model.id, 0.0))
}
}
is SimResourceCommand.Consume -> {
@@ -545,7 +550,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
totalOvercommittedWork += remainingWork
}
- process(workload.onNext(vm.ctx, model.id, remainingWork))
+ process(workload.onNext(ctx, model.id, remainingWork))
} else {
process(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline))
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
index ea8eeb37..bfaa60bc 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
@@ -42,7 +42,7 @@ public interface SimMachine : AutoCloseable {
/**
* Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
- public suspend fun run(workload: SimWorkload)
+ public suspend fun run(workload: SimWorkload, meta: Map<String, Any> = emptyMap())
/**
* Terminate this machine.
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
index 66d3eda7..778b68ca 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
@@ -117,33 +117,33 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
private var cpus: List<VCpu> = emptyList()
/**
- * The execution context in which the workload runs.
- */
- val ctx = object : SimExecutionContext {
- override val machine: SimMachineModel
- get() = model
-
- override val clock: Clock
- get() = this@SimSpaceSharedHypervisor.ctx.clock
-
- override fun interrupt(cpu: Int) {
- require(cpu < cpus.size) { "Invalid CPU identifier" }
- cpus[cpu].interrupt()
- }
- }
-
- /**
* Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
- override suspend fun run(workload: SimWorkload) {
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
require(!isTerminated) { "Machine is terminated" }
require(cont == null) { "Run should not be called concurrently" }
+ val ctx = object : SimExecutionContext {
+ override val machine: SimMachineModel
+ get() = model
+
+ override val clock: Clock
+ get() = this@SimSpaceSharedHypervisor.ctx.clock
+
+ override val meta: Map<String, Any>
+ get() = meta
+
+ override fun interrupt(cpu: Int) {
+ require(cpu < cpus.size) { "Invalid CPU identifier" }
+ cpus[cpu].interrupt()
+ }
+ }
+
workload.onStart(ctx)
return suspendCancellableCoroutine { cont ->
this.cont = cont
- this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, model, workload, pCPUs[index]) }
+ this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, ctx, model, workload, pCPUs[index]) }
for (cpu in cpus) {
cpu.start()
@@ -193,7 +193,7 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
/**
* A CPU of the virtual machine.
*/
- private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) {
+ private inner class VCpu(val vm: SimVm, val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) {
/**
* The processing speed of the vCPU.
*/
@@ -267,7 +267,7 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
* Interrupt the CPU.
*/
fun interrupt() {
- ctx.interrupt(pCPU)
+ this@SimSpaceSharedHypervisor.ctx.interrupt(pCPU)
}
/**
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
index 948595b1..10f29f4e 100644
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
+++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
@@ -58,12 +58,22 @@ public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl()
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
private class EventFlowImpl<T> : EventFlow<T> {
private var closed: Boolean = false
- private val subscribers = HashMap<SendChannel<T>, Unit>()
+ private val subscribers = mutableListOf<SendChannel<T>>()
override fun emit(event: T) {
+ if (closed) {
+ return
+ }
+
+ val it = subscribers.iterator()
synchronized(this) {
- for ((chan, _) in subscribers) {
- chan.offer(event)
+ while (it.hasNext()) {
+ val chan = it.next()
+ if (chan.isClosedForSend) {
+ it.remove()
+ } else {
+ chan.offer(event)
+ }
}
}
}
@@ -72,9 +82,11 @@ private class EventFlowImpl<T> : EventFlow<T> {
synchronized(this) {
closed = true
- for ((chan, _) in subscribers) {
+ for (chan in subscribers) {
chan.close()
}
+
+ subscribers.clear()
}
}
@@ -87,9 +99,13 @@ private class EventFlowImpl<T> : EventFlow<T> {
}
channel = Channel(Channel.UNLIMITED)
- subscribers[channel] = Unit
+ subscribers.add(channel)
+ }
+ try {
+ channel.consumeAsFlow().collect(collector)
+ } finally {
+ channel.close()
}
- channel.consumeAsFlow().collect(collector)
}
override fun toString(): String = "EventFlow"