summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Host.kt94
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostEvent.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt)15
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostListener.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt)33
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostState.kt38
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/InsufficientMemoryOnServerException.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt)2
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt63
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt300
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt231
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt240
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt8
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt8
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt10
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt (renamed from simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt)15
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt10
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt12
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt4
21 files changed, 623 insertions, 480 deletions
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Host.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Host.kt
new file mode 100644
index 00000000..60a31b69
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Host.kt
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.core.virt
+
+import kotlinx.coroutines.flow.Flow
+import org.opendc.compute.core.Server
+import java.util.*
+
+/**
+ * Base interface for representing compute resources that host virtualized [Server] instances.
+ */
+public interface Host {
+ /**
+ * A unique identifier representing the host.
+ */
+ public val uid: UUID
+
+ /**
+ * The state of the host.
+ */
+ public val state: HostState
+
+ /**
+ * The events emitted by the driver.
+ */
+ public val events: Flow<HostEvent>
+
+ /**
+ * Determine whether the specified [instance][server] can still fit on this host.
+ */
+ public fun canFit(server: Server): Boolean
+
+ /**
+ * Register the specified [instance][server] on the host.
+ *
+ * Once the method returns, the instance should be running if [start] is true or else the instance should be
+ * stopped.
+ */
+ public suspend fun spawn(server: Server, start: Boolean = true)
+
+ /**
+ * Determine whether the specified [instance][server] exists on the host.
+ */
+ public operator fun contains(server: Server): Boolean
+
+ /**
+ * Stat the server [instance][server] if it is currently not running on this host.
+ *
+ * @throws IllegalArgumentException if the server is not present on the host.
+ */
+ public suspend fun start(server: Server)
+
+ /**
+ * Stop the server [instance][server] if it is currently running on this host.
+ *
+ * @throws IllegalArgumentException if the server is not present on the host.
+ */
+ public suspend fun stop(server: Server)
+
+ /**
+ * Terminate the specified [instance][server] on this host and cleanup all resources associated with it.
+ */
+ public suspend fun terminate(server: Server)
+
+ /**
+ * Add a [HostListener] to this host.
+ */
+ public fun addListener(listener: HostListener)
+
+ /**
+ * Remove a [HostListener] from this host.
+ */
+ public fun removeListener(listener: HostListener)
+}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostEvent.kt
index d1c8d790..a07523e8 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostEvent.kt
@@ -23,16 +23,15 @@
package org.opendc.compute.core.virt
import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.virt.driver.VirtDriver
/**
- * An event that is emitted by a [VirtDriver].
+ * An event that is emitted by a [Host].
*/
-public sealed class HypervisorEvent {
+public sealed class HostEvent {
/**
* The driver that emitted the event.
*/
- public abstract val driver: VirtDriver
+ public abstract val driver: Host
/**
* This event is emitted when the number of active servers on the server managed by this driver is updated.
@@ -42,10 +41,10 @@ public sealed class HypervisorEvent {
* @property availableMemory The available memory, in MB.
*/
public data class VmsUpdated(
- override val driver: VirtDriver,
+ override val driver: Host,
public val numberOfActiveServers: Int,
public val availableMemory: Long
- ) : HypervisorEvent()
+ ) : HostEvent()
/**
* This event is emitted when a slice is finished.
@@ -63,7 +62,7 @@ public sealed class HypervisorEvent {
* @property numberOfDeployedImages The number of images deployed on this hypervisor.
*/
public data class SliceFinished(
- override val driver: VirtDriver,
+ override val driver: Host,
public val requestedBurst: Long,
public val grantedBurst: Long,
public val overcommissionedBurst: Long,
@@ -72,5 +71,5 @@ public sealed class HypervisorEvent {
public val cpuDemand: Double,
public val numberOfDeployedImages: Int,
public val host: Node
- ) : HypervisorEvent()
+ ) : HostEvent()
}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostListener.kt
index 1ae52baa..b14d9bb5 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostListener.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,35 +22,20 @@
package org.opendc.compute.core.virt
-import kotlinx.coroutines.flow.Flow
-import org.opendc.core.Identity
-import java.util.UUID
+import org.opendc.compute.core.Server
+import org.opendc.compute.core.ServerState
/**
- * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment
- * into several virtual guest machines.
+ * Listener interface for events originating from a [Host].
*/
-public class Hypervisor(
+public interface HostListener {
/**
- * The unique identifier of the hypervisor.
+ * This method is invoked when the state of an [instance][server] on [host] changes.
*/
- override val uid: UUID,
+ public fun onStateChange(host: Host, server: Server, newState: ServerState) {}
/**
- * The optional name of the hypervisor.
+ * This method is invoked when the state of a [Host] has changed.
*/
- override val name: String,
-
- /**
- * Metadata of the hypervisor.
- */
- public val metadata: Map<String, Any>,
-
- /**
- * The events that are emitted by the hypervisor.
- */
- public val events: Flow<HypervisorEvent>
-) : Identity {
- override fun hashCode(): Int = uid.hashCode()
- override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid
+ public fun onStateChange(host: Host, newState: HostState) {}
}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostState.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostState.kt
new file mode 100644
index 00000000..7f87f5f7
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostState.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.core.virt
+
+/**
+ * The state of a host.
+ */
+public enum class HostState {
+ /**
+ * The host is up.
+ */
+ UP,
+
+ /**
+ * The host is down.
+ */
+ DOWN
+}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/InsufficientMemoryOnServerException.kt
index 6fe84ea6..0f7b5826 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/InsufficientMemoryOnServerException.kt
@@ -1,3 +1,3 @@
-package org.opendc.compute.core.virt.driver
+package org.opendc.compute.core.virt
public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.")
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt
deleted file mode 100644
index 68cc7b50..00000000
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.core.virt.driver
-
-import kotlinx.coroutines.flow.Flow
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.image.Image
-import org.opendc.compute.core.virt.HypervisorEvent
-import org.opendc.core.services.AbstractServiceKey
-import java.util.UUID
-
-/**
- * A driver interface for a hypervisor running on some host server and communicating with the central compute service to
- * provide virtualization for that particular resource.
- */
-public interface VirtDriver {
- /**
- * The events emitted by the driver.
- */
- public val events: Flow<HypervisorEvent>
-
- /**
- * Determine whether the specified [flavor] can still fit on this driver.
- */
- public fun canFit(flavor: Flavor): Boolean
-
- /**
- * Spawn the given [Image] on the compute resource of this driver.
- *
- * @param name The name of the server to spawn.
- * @param image The image to deploy.
- * @param flavor The flavor of the server which this driver is controlling.
- * @return The virtual server spawned by this method.
- */
- public suspend fun spawn(
- name: String,
- image: Image,
- flavor: Flavor
- ): Server
-
- public companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver")
-}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt
index b967044c..8da849af 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt
@@ -26,7 +26,7 @@ import kotlinx.coroutines.flow.Flow
import org.opendc.compute.core.Flavor
import org.opendc.compute.core.Server
import org.opendc.compute.core.image.Image
-import org.opendc.compute.core.virt.driver.VirtDriver
+import org.opendc.compute.core.virt.Host
/**
* A service for VM provisioning on a cloud.
@@ -40,7 +40,7 @@ public interface VirtProvisioningService {
/**
* Obtain the active hypervisors for this provisioner.
*/
- public suspend fun drivers(): Set<VirtDriver>
+ public suspend fun drivers(): Set<Host>
/**
* The number of hosts available in the system.
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt
index cf2747cd..a650144b 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt
@@ -23,7 +23,7 @@
package org.opendc.compute.simulator
import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.virt.driver.VirtDriver
+import org.opendc.compute.core.virt.Host
import java.util.UUID
public class HypervisorView(
@@ -33,5 +33,5 @@ public class HypervisorView(
public var availableMemory: Long,
public var provisionedCores: Int
) {
- public lateinit var driver: VirtDriver
+ public lateinit var driver: Host
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
new file mode 100644
index 00000000..85fa2cb6
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -0,0 +1,300 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.Flow
+import mu.KotlinLogging
+import org.opendc.compute.core.*
+import org.opendc.compute.core.Flavor
+import org.opendc.compute.core.metal.Node
+import org.opendc.compute.core.virt.*
+import org.opendc.simulator.compute.*
+import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.workload.SimResourceCommand
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.utils.flow.EventFlow
+import java.util.*
+import kotlin.coroutines.resume
+
+/**
+ * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
+ */
+public class SimHost(
+ override val uid: UUID,
+ private val coroutineScope: CoroutineScope,
+ hypervisor: SimHypervisorProvider
+) : Host, SimWorkload {
+ /**
+ * The logger instance of this server.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The execution context in which the [Host] runs.
+ */
+ private lateinit var ctx: SimExecutionContext
+
+ override val events: Flow<HostEvent>
+ get() = _events
+ internal val _events = EventFlow<HostEvent>()
+
+ /**
+ * The event listeners registered with this host.
+ */
+ private val listeners = mutableListOf<HostListener>()
+
+ /**
+ * Current total memory use of the images on this hypervisor.
+ */
+ private var availableMemory: Long = 0
+
+ /**
+ * The hypervisor to run multiple workloads.
+ */
+ private val hypervisor = hypervisor.create(
+ object : SimHypervisor.Listener {
+ override fun onSliceFinish(
+ hypervisor: SimHypervisor,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ _events.emit(
+ HostEvent.SliceFinished(
+ this@SimHost,
+ requestedWork,
+ grantedWork,
+ overcommittedWork,
+ interferedWork,
+ cpuUsage,
+ cpuDemand,
+ guests.size,
+ node
+ )
+ )
+ }
+ }
+ )
+
+ /**
+ * The virtual machines running on the hypervisor.
+ */
+ private val guests = HashMap<Server, SimGuest>()
+
+ /**
+ * The node on which the hypervisor runs.
+ */
+ public val node: Node
+ get() = ctx.meta["node"] as Node
+
+ override val state: HostState
+ get() = _state
+ private var _state: HostState = HostState.UP
+ set(value) {
+ listeners.forEach { it.onStateChange(this, value) }
+ field = value
+ }
+
+ override fun canFit(server: Server): Boolean {
+ val sufficientMemory = availableMemory > server.flavor.memorySize
+ val enoughCpus = ctx.machine.cpus.size >= server.flavor.cpuCount
+ val canFit = hypervisor.canFit(server.flavor.toMachineModel())
+
+ return sufficientMemory && enoughCpus && canFit
+ }
+
+ override suspend fun spawn(server: Server, start: Boolean) {
+ // Return if the server already exists on this host
+ if (server in this) {
+ return
+ }
+
+ require(canFit(server)) { "Server does not fit" }
+ val guest = SimGuest(server, hypervisor.createMachine(server.flavor.toMachineModel()))
+ guests[server] = guest
+
+ if (start) {
+ guest.start()
+ }
+
+ _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory))
+ }
+
+ override fun contains(server: Server): Boolean {
+ return server in guests
+ }
+
+ override suspend fun start(server: Server) {
+ val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" }
+ guest.start()
+ }
+
+ override suspend fun stop(server: Server) {
+ val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" }
+ guest.stop()
+ }
+
+ override suspend fun terminate(server: Server) {
+ val guest = guests.remove(server) ?: return
+ guest.terminate()
+ }
+
+ override fun addListener(listener: HostListener) {
+ listeners.add(listener)
+ }
+
+ override fun removeListener(listener: HostListener) {
+ listeners.remove(listener)
+ }
+
+ /**
+ * Convert flavor to machine model.
+ */
+ private fun Flavor.toMachineModel(): SimMachineModel {
+ val originalCpu = ctx.machine.cpus[0]
+ val processingNode = originalCpu.node.copy(coreCount = cpuCount)
+ val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
+ val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
+
+ return SimMachineModel(processingUnits, memoryUnits)
+ }
+
+ private fun onGuestStart(vm: SimGuest) {
+ guests.forEach { _, guest ->
+ if (guest.state == ServerState.ACTIVE) {
+ vm.performanceInterferenceModel?.onStart(vm.server.image.name)
+ }
+ }
+
+ listeners.forEach { it.onStateChange(this, vm.server, vm.state) }
+ }
+
+ private fun onGuestStop(vm: SimGuest) {
+ guests.forEach { _, guest ->
+ if (guest.state == ServerState.ACTIVE) {
+ vm.performanceInterferenceModel?.onStop(vm.server.image.name)
+ }
+ }
+
+ listeners.forEach { it.onStateChange(this, vm.server, vm.state) }
+
+ _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory))
+ }
+
+ /**
+ * A virtual machine instance that the driver manages.
+ */
+ private inner class SimGuest(val server: Server, val machine: SimMachine) {
+ val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+
+ var state: ServerState = ServerState.SHUTOFF
+
+ suspend fun start() {
+ when (state) {
+ ServerState.SHUTOFF -> {
+ logger.info { "User requested to start server ${server.uid}" }
+ launch()
+ }
+ ServerState.ACTIVE -> return
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ suspend fun stop() {
+ when (state) {
+ ServerState.ACTIVE, ServerState.ERROR -> {
+ val job = job ?: throw IllegalStateException("Server should be active")
+ job.cancel()
+ job.join()
+ }
+ ServerState.SHUTOFF -> return
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ suspend fun terminate() {
+ stop()
+ }
+
+ private var job: Job? = null
+
+ private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont ->
+ assert(job == null) { "Concurrent job running" }
+ val workload = server.image.tags["workload"] as SimWorkload
+
+ val job = coroutineScope.launch {
+ delay(1) // TODO Introduce boot time
+ init()
+ cont.resume(Unit)
+ try {
+ machine.run(workload, mapOf("driver" to this@SimHost, "server" to server))
+ exit(null)
+ } catch (cause: Throwable) {
+ exit(cause)
+ } finally {
+ machine.close()
+ }
+ }
+ this.job = job
+ job.invokeOnCompletion {
+ this.job = null
+ }
+ }
+
+ private fun init() {
+ state = ServerState.ACTIVE
+ onGuestStart(this)
+ }
+
+ private fun exit(cause: Throwable?) {
+ state =
+ if (cause == null)
+ ServerState.SHUTOFF
+ else
+ ServerState.ERROR
+
+ availableMemory += server.flavor.memorySize
+ onGuestStop(this)
+ }
+ }
+
+ override fun onStart(ctx: SimExecutionContext) {
+ this.ctx = ctx
+ this.availableMemory = ctx.machine.memory.map { it.size }.sum()
+ this.hypervisor.onStart(ctx)
+ }
+
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ return hypervisor.onStart(ctx, cpu)
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ return hypervisor.onNext(ctx, cpu, remainingWork)
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
deleted file mode 100644
index 35d82211..00000000
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.launch
-import org.opendc.compute.core.*
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.image.Image
-import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.virt.HypervisorEvent
-import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException
-import org.opendc.compute.core.virt.driver.VirtDriver
-import org.opendc.simulator.compute.*
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.workload.SimResourceCommand
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.utils.flow.EventFlow
-import java.util.*
-
-/**
- * A [VirtDriver] that is simulates virtual machines on a physical machine using [SimHypervisor].
- */
-public class SimVirtDriver(
- private val coroutineScope: CoroutineScope,
- hypervisor: SimHypervisorProvider
-) : VirtDriver, SimWorkload {
- /**
- * The execution context in which the [VirtDriver] runs.
- */
- private lateinit var ctx: SimExecutionContext
-
- /**
- * The node on which the hypervisor runs.
- */
- public val node: Node
- get() = ctx.meta["node"] as Node
-
- /**
- * The [EventFlow] to emit the events.
- */
- internal val eventFlow = EventFlow<HypervisorEvent>()
-
- override val events: Flow<HypervisorEvent> = eventFlow
-
- /**
- * Current total memory use of the images on this hypervisor.
- */
- private var availableMemory: Long = 0
-
- /**
- * The hypervisor to run multiple workloads.
- */
- private val hypervisor = hypervisor.create(
- object : SimHypervisor.Listener {
- override fun onSliceFinish(
- hypervisor: SimHypervisor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- eventFlow.emit(
- HypervisorEvent.SliceFinished(
- this@SimVirtDriver,
- requestedWork,
- grantedWork,
- overcommittedWork,
- interferedWork,
- cpuUsage,
- cpuDemand,
- vms.size,
- node
- )
- )
- }
- }
- )
-
- /**
- * The virtual machines running on the hypervisor.
- */
- private val vms = HashSet<VirtualMachine>()
-
- override fun canFit(flavor: Flavor): Boolean {
- val sufficientMemory = availableMemory > flavor.memorySize
- val enoughCpus = ctx.machine.cpus.size >= flavor.cpuCount
- val canFit = hypervisor.canFit(flavor.toMachineModel())
-
- return sufficientMemory && enoughCpus && canFit
- }
-
- override suspend fun spawn(name: String, image: Image, flavor: Flavor): Server {
- val requiredMemory = flavor.memorySize
- if (availableMemory - requiredMemory < 0) {
- throw InsufficientMemoryOnServerException()
- }
- require(flavor.cpuCount <= ctx.machine.cpus.size) { "Machine does not fit" }
-
- val events = EventFlow<ServerEvent>()
- val server = Server(
- UUID.randomUUID(),
- name,
- emptyMap(),
- flavor,
- image,
- ServerState.BUILD,
- events
- )
- availableMemory -= requiredMemory
-
- val vm = VirtualMachine(server, events, hypervisor.createMachine(flavor.toMachineModel()))
- vms.add(vm)
- vmStarted(vm)
- eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
- return server
- }
-
- /**
- * Convert flavor to machine model.
- */
- private fun Flavor.toMachineModel(): SimMachineModel {
- val originalCpu = ctx.machine.cpus[0]
- val processingNode = originalCpu.node.copy(coreCount = cpuCount)
- val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
- val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
-
- return SimMachineModel(processingUnits, memoryUnits)
- }
-
- private fun vmStarted(vm: VirtualMachine) {
- vms.forEach {
- vm.performanceInterferenceModel?.onStart(it.server.image.name)
- }
- }
-
- private fun vmStopped(vm: VirtualMachine) {
- vms.forEach {
- vm.performanceInterferenceModel?.onStop(it.server.image.name)
- }
- }
-
- /**
- * A virtual machine instance that the driver manages.
- */
- private inner class VirtualMachine(server: Server, val events: EventFlow<ServerEvent>, val machine: SimMachine) {
- val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
-
- val job = coroutineScope.launch {
- val workload = server.image.tags["workload"] as SimWorkload
-
- delay(1) // TODO Introduce boot time
- init()
- try {
- machine.run(workload, mapOf("driver" to this@SimVirtDriver, "server" to server))
- exit(null)
- } catch (cause: Throwable) {
- exit(cause)
- } finally {
- machine.close()
- }
- }
-
- var server: Server = server
- set(value) {
- if (field.state != value.state) {
- events.emit(ServerEvent.StateChanged(value, field.state))
- }
-
- field = value
- }
-
- private fun init() {
- server = server.copy(state = ServerState.ACTIVE)
- }
-
- private fun exit(cause: Throwable?) {
- val serverState =
- if (cause == null)
- ServerState.SHUTOFF
- else
- ServerState.ERROR
- server = server.copy(state = serverState)
- availableMemory += server.flavor.memorySize
- vms.remove(this)
- vmStopped(this)
- eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimVirtDriver, vms.size, availableMemory))
- events.close()
- }
- }
-
- override fun onStart(ctx: SimExecutionContext) {
- this.ctx = ctx
- this.availableMemory = ctx.machine.memory.map { it.size }.sum()
- this.hypervisor.onStart(ctx)
- }
-
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return hypervisor.onStart(ctx, cpu)
- }
-
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- return hypervisor.onNext(ctx, cpu, remainingWork)
- }
-}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
index 18afd0c2..ee747a9a 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
@@ -36,9 +36,7 @@ import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.metal.NodeEvent
import org.opendc.compute.core.metal.NodeState
import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.core.virt.HypervisorEvent
-import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException
-import org.opendc.compute.core.virt.driver.VirtDriver
+import org.opendc.compute.core.virt.*
import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import org.opendc.compute.core.virt.service.VirtProvisioningService
import org.opendc.compute.core.virt.service.events.*
@@ -62,13 +60,18 @@ public class SimVirtProvisioningService(
private val tracer: EventTracer,
private val hypervisor: SimHypervisorProvider,
private val schedulingQuantum: Long = 300000, // 5 minutes in milliseconds
-) : VirtProvisioningService {
+) : VirtProvisioningService, HostListener {
/**
* The logger instance to use.
*/
private val logger = KotlinLogging.logger {}
/**
+ * A mapping from host to hypervisor view.
+ */
+ private val hostToHv = mutableMapOf<Host, HypervisorView>()
+
+ /**
* The hypervisors that have been launched by the service.
*/
private val hypervisors: MutableMap<Node, HypervisorView> = mutableMapOf()
@@ -79,14 +82,19 @@ public class SimVirtProvisioningService(
private val availableHypervisors: MutableSet<HypervisorView> = mutableSetOf()
/**
- * The incoming images to be processed by the provisioner.
+ * The servers that should be launched by the service.
+ */
+ private val queue: Deque<LaunchRequest> = ArrayDeque()
+
+ /**
+ * The active servers in the system.
*/
- private val incomingImages: Deque<ImageView> = ArrayDeque()
+ private val activeServers: MutableSet<Server> = mutableSetOf()
/**
- * The active images in the system.
+ * The [Random] instance used to generate unique identifiers for the objects.
*/
- private val activeImages: MutableSet<ImageView> = mutableSetOf()
+ private val random = Random(0)
public var submittedVms: Int = 0
public var queuedVms: Int = 0
@@ -102,12 +110,9 @@ public class SimVirtProvisioningService(
*/
private val allocationLogic = allocationPolicy()
- /**
- * The [EventFlow] to emit the events.
- */
- internal val eventFlow = EventFlow<VirtProvisioningEvent>()
-
- override val events: Flow<VirtProvisioningEvent> = eventFlow
+ override val events: Flow<VirtProvisioningEvent>
+ get() = _events
+ private val _events = EventFlow<VirtProvisioningEvent>()
/**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
@@ -118,7 +123,8 @@ public class SimVirtProvisioningService(
coroutineScope.launch {
val provisionedNodes = provisioningService.nodes()
provisionedNodes.forEach { node ->
- val workload = SimVirtDriver(coroutineScope, hypervisor)
+ val workload = SimHost(UUID(random.nextLong(), random.nextLong()), coroutineScope, hypervisor)
+ workload.addListener(this@SimVirtProvisioningService)
val hypervisorImage = Image(UUID.randomUUID(), "vmm", mapOf("workload" to workload))
launch {
val deployedNode = provisioningService.deploy(node, hypervisorImage)
@@ -132,7 +138,7 @@ public class SimVirtProvisioningService(
}
}
- override suspend fun drivers(): Set<VirtDriver> {
+ override suspend fun drivers(): Set<Host> {
return availableHypervisors.map { it.driver }.toSet()
}
@@ -145,7 +151,7 @@ public class SimVirtProvisioningService(
): Server {
tracer.commit(VmSubmissionEvent(name, image, flavor))
- eventFlow.emit(
+ _events.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
hypervisors.size,
@@ -158,9 +164,9 @@ public class SimVirtProvisioningService(
)
)
- return suspendCancellableCoroutine<Server> { cont ->
- val vmInstance = ImageView(name, image, flavor, cont)
- incomingImages += vmInstance
+ return suspendCancellableCoroutine { cont ->
+ val request = LaunchRequest(createServer(name, image, flavor), cont)
+ queue += request
requestCycle()
}
}
@@ -170,6 +176,22 @@ public class SimVirtProvisioningService(
provisionedNodes.forEach { node -> provisioningService.stop(node) }
}
+ private fun createServer(
+ name: String,
+ image: Image,
+ flavor: Flavor
+ ): Server {
+ return Server(
+ uid = UUID(random.nextLong(), random.nextLong()),
+ name = name,
+ tags = emptyMap(),
+ flavor = flavor,
+ image = image,
+ state = ServerState.BUILD,
+ events = EventFlow()
+ )
+ }
+
private fun requestCycle() {
// Bail out in case we have already requested a new cycle.
if (scheduler.isTimerActive(Unit)) {
@@ -182,23 +204,23 @@ public class SimVirtProvisioningService(
val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
scheduler.startSingleTimer(Unit, delay) {
- coroutineScope.launch { schedule() }
+ schedule()
}
}
- private suspend fun schedule() {
- while (incomingImages.isNotEmpty()) {
- val imageInstance = incomingImages.peekFirst()
- val requiredMemory = imageInstance.flavor.memorySize
- val selectedHv = allocationLogic.select(availableHypervisors, imageInstance)
+ private fun schedule() {
+ while (queue.isNotEmpty()) {
+ val (server, cont) = queue.peekFirst()
+ val requiredMemory = server.flavor.memorySize
+ val selectedHv = allocationLogic.select(availableHypervisors, server)
- if (selectedHv == null || !selectedHv.driver.canFit(imageInstance.flavor)) {
- logger.trace { "Image ${imageInstance.image} selected for scheduling but no capacity available for it." }
+ if (selectedHv == null || !selectedHv.driver.canFit(server)) {
+ logger.trace { "Server $server selected for scheduling but no capacity available for it." }
- if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
- tracer.commit(VmSubmissionInvalidEvent(imageInstance.name))
+ if (requiredMemory > maxMemory || server.flavor.cpuCount > maxCores) {
+ tracer.commit(VmSubmissionInvalidEvent(server.name))
- eventFlow.emit(
+ _events.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
hypervisors.size,
@@ -212,9 +234,9 @@ public class SimVirtProvisioningService(
)
// Remove the incoming image
- incomingImages.poll()
+ queue.poll()
- logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]")
+ logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
continue
} else {
break
@@ -222,86 +244,49 @@ public class SimVirtProvisioningService(
}
try {
- logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.node.uid} ${selectedHv.node.name} ${selectedHv.node.flavor}" }
- incomingImages.poll()
+ logger.info { "[${clock.millis()}] Spawning $server on ${selectedHv.node.uid} ${selectedHv.node.name} ${selectedHv.node.flavor}" }
+ queue.poll()
// Speculatively update the hypervisor view information to prevent other images in the queue from
// deciding on stale values.
selectedHv.numberOfActiveServers++
- selectedHv.provisionedCores += imageInstance.flavor.cpuCount
+ selectedHv.provisionedCores += server.flavor.cpuCount
selectedHv.availableMemory -= requiredMemory // XXX Temporary hack
- val server = selectedHv.driver.spawn(
- imageInstance.name,
- imageInstance.image,
- imageInstance.flavor
- )
- imageInstance.server = server
- imageInstance.continuation.resume(server)
-
- tracer.commit(VmScheduledEvent(imageInstance.name))
-
- eventFlow.emit(
- VirtProvisioningEvent.MetricsAvailable(
- this@SimVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- ++runningVms,
- finishedVms,
- --queuedVms,
- unscheduledVms
- )
- )
- activeImages += imageInstance
+ coroutineScope.launch {
+ try {
+ cont.resume(server)
+ selectedHv.driver.spawn(server)
+ activeServers += server
+
+ tracer.commit(VmScheduledEvent(server.name))
+ _events.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ ++runningVms,
+ finishedVms,
+ --queuedVms,
+ unscheduledVms
+ )
+ )
+ } catch (e: InsufficientMemoryOnServerException) {
+ logger.error("Failed to deploy VM", e)
- server.events
- .onEach { event ->
- when (event) {
- is ServerEvent.StateChanged -> {
- if (event.server.state == ServerState.SHUTOFF) {
- logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
-
- tracer.commit(VmStoppedEvent(event.server.name))
-
- eventFlow.emit(
- VirtProvisioningEvent.MetricsAvailable(
- this@SimVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- --runningVms,
- ++finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
-
- activeImages -= imageInstance
- selectedHv.provisionedCores -= server.flavor.cpuCount
-
- // Try to reschedule if needed
- if (incomingImages.isNotEmpty()) {
- requestCycle()
- }
- }
- }
- }
+ selectedHv.numberOfActiveServers--
+ selectedHv.provisionedCores -= server.flavor.cpuCount
+ selectedHv.availableMemory += requiredMemory
}
- .launchIn(coroutineScope)
- } catch (e: InsufficientMemoryOnServerException) {
- logger.error("Failed to deploy VM", e)
-
- selectedHv.numberOfActiveServers--
- selectedHv.provisionedCores -= imageInstance.flavor.cpuCount
- selectedHv.availableMemory += requiredMemory
+ }
} catch (e: Throwable) {
logger.error("Failed to deploy VM", e)
}
}
}
- private fun stateChanged(node: Node, hypervisor: SimVirtDriver) {
+ private fun stateChanged(node: Node, hypervisor: SimHost) {
when (node.state) {
NodeState.ACTIVE -> {
logger.debug { "[${clock.millis()}] Server ${node.uid} available: ${node.state}" }
@@ -320,7 +305,7 @@ public class SimVirtProvisioningService(
hv.driver = hypervisor
hv.driver.events
.onEach { event ->
- if (event is HypervisorEvent.VmsUpdated) {
+ if (event is HostEvent.VmsUpdated) {
hv.numberOfActiveServers = event.numberOfActiveServers
hv.availableMemory = event.availableMemory
}
@@ -329,12 +314,13 @@ public class SimVirtProvisioningService(
maxCores = max(maxCores, node.flavor.cpuCount)
maxMemory = max(maxMemory, node.flavor.memorySize)
hypervisors[node] = hv
+ hostToHv[hypervisor] = hv
availableHypervisors += hv
}
tracer.commit(HypervisorAvailableEvent(node.uid))
- eventFlow.emit(
+ _events.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
hypervisors.size,
@@ -348,7 +334,7 @@ public class SimVirtProvisioningService(
)
// Re-schedule on the new machine
- if (incomingImages.isNotEmpty()) {
+ if (queue.isNotEmpty()) {
requestCycle()
}
}
@@ -359,7 +345,7 @@ public class SimVirtProvisioningService(
tracer.commit(HypervisorUnavailableEvent(hv.uid))
- eventFlow.emit(
+ _events.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
hypervisors.size,
@@ -372,7 +358,7 @@ public class SimVirtProvisioningService(
)
)
- if (incomingImages.isNotEmpty()) {
+ if (queue.isNotEmpty()) {
requestCycle()
}
}
@@ -380,11 +366,43 @@ public class SimVirtProvisioningService(
}
}
- public data class ImageView(
- public val name: String,
- public val image: Image,
- public val flavor: Flavor,
- public val continuation: Continuation<Server>,
- public var server: Server? = null
- )
+ override fun onStateChange(host: Host, server: Server, newState: ServerState) {
+ val eventFlow = server.events as EventFlow<ServerEvent>
+ val newServer = server.copy(state = newState)
+ eventFlow.emit(ServerEvent.StateChanged(newServer, server.state))
+
+ if (newState == ServerState.SHUTOFF) {
+ logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
+
+ tracer.commit(VmStoppedEvent(server.name))
+
+ _events.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ --runningVms,
+ ++finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
+
+ activeServers -= server
+ val hv = hostToHv[host]
+ if (hv != null) {
+ hv.provisionedCores -= server.flavor.cpuCount
+ } else {
+ logger.error { "Unknown host $host" }
+ }
+
+ // Try to reschedule if needed
+ if (queue.isNotEmpty()) {
+ requestCycle()
+ }
+ }
+ }
+
+ public data class LaunchRequest(val server: Server, val cont: Continuation<Server>)
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt
index 2018b9f2..3099f1ad 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt
@@ -22,9 +22,9 @@
package org.opendc.compute.simulator.allocation
+import org.opendc.compute.core.Server
import org.opendc.compute.core.metal.Node
import org.opendc.compute.simulator.HypervisorView
-import org.opendc.compute.simulator.SimVirtProvisioningService
/**
* A policy for selecting the [Node] an image should be deployed to,
@@ -39,7 +39,7 @@ public interface AllocationPolicy {
*/
public fun select(
hypervisors: Set<HypervisorView>,
- image: SimVirtProvisioningService.ImageView
+ server: Server
): HypervisorView?
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt
index 04a181a6..df48f405 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt
@@ -22,8 +22,8 @@
package org.opendc.compute.simulator.allocation
+import org.opendc.compute.core.Server
import org.opendc.compute.simulator.HypervisorView
-import org.opendc.compute.simulator.SimVirtProvisioningService
/**
* The logic for an [AllocationPolicy] that uses a [Comparator] to select the appropriate node.
@@ -36,12 +36,12 @@ public interface ComparableAllocationPolicyLogic : AllocationPolicy.Logic {
override fun select(
hypervisors: Set<HypervisorView>,
- image: SimVirtProvisioningService.ImageView
+ server: Server
): HypervisorView? {
return hypervisors.asSequence()
.filter { hv ->
- val fitsMemory = hv.availableMemory >= (image.flavor.memorySize)
- val fitsCpu = hv.node.flavor.cpuCount >= image.flavor.cpuCount
+ val fitsMemory = hv.availableMemory >= (server.flavor.memorySize)
+ val fitsCpu = hv.node.flavor.cpuCount >= server.flavor.cpuCount
fitsMemory && fitsCpu
}
.minWithOrNull(comparator.thenBy { it.node.uid })
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt
index 9a89fccd..03706b42 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt
@@ -22,8 +22,8 @@
package org.opendc.compute.simulator.allocation
+import org.opendc.compute.core.Server
import org.opendc.compute.simulator.HypervisorView
-import org.opendc.compute.simulator.SimVirtProvisioningService
import kotlin.random.Random
/**
@@ -34,12 +34,12 @@ public class RandomAllocationPolicy(private val random: Random = Random(0)) : Al
override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic {
override fun select(
hypervisors: Set<HypervisorView>,
- image: SimVirtProvisioningService.ImageView
+ server: Server
): HypervisorView? {
return hypervisors.asIterable()
.filter { hv ->
- val fitsMemory = hv.availableMemory >= (image.image.tags["required-memory"] as Long)
- val fitsCpu = hv.node.flavor.cpuCount >= image.flavor.cpuCount
+ val fitsMemory = hv.availableMemory >= (server.image.tags["required-memory"] as Long)
+ val fitsCpu = hv.node.flavor.cpuCount >= server.flavor.cpuCount
fitsMemory && fitsCpu
}
.randomOrNull(random)
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt
index 582fe817..30065621 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt
@@ -23,8 +23,8 @@
package org.opendc.compute.simulator.allocation
import mu.KotlinLogging
+import org.opendc.compute.core.Server
import org.opendc.compute.simulator.HypervisorView
-import org.opendc.compute.simulator.SimVirtProvisioningService
private val logger = KotlinLogging.logger {}
@@ -38,14 +38,14 @@ public class ReplayAllocationPolicy(private val vmPlacements: Map<String, String
override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic {
override fun select(
hypervisors: Set<HypervisorView>,
- image: SimVirtProvisioningService.ImageView
+ server: Server
): HypervisorView? {
- val clusterName = vmPlacements[image.name]
- ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${image.name}")
+ val clusterName = vmPlacements[server.name]
+ ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${server.name}")
val machinesInCluster = hypervisors.filter { it.node.name.contains(clusterName) }
if (machinesInCluster.isEmpty()) {
- logger.info { "Could not find any machines belonging to cluster $clusterName for image ${image.name}, assigning randomly." }
+ logger.info { "Could not find any machines belonging to cluster $clusterName for image ${server.name}, assigning randomly." }
return hypervisors.maxByOrNull { it.availableMemory }
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 6b754572..83e891cb 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -24,6 +24,7 @@ package org.opendc.compute.simulator
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
@@ -33,8 +34,10 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.core.Flavor
+import org.opendc.compute.core.Server
+import org.opendc.compute.core.ServerState
import org.opendc.compute.core.image.Image
-import org.opendc.compute.core.virt.HypervisorEvent
+import org.opendc.compute.core.virt.HostEvent
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -49,7 +52,7 @@ import java.util.UUID
* Basic test-suite for the hypervisor.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-internal class SimVirtDriverTest {
+internal class SimHostTest {
private lateinit var scope: TestCoroutineScope
private lateinit var clock: Clock
private lateinit var machineModel: SimMachineModel
@@ -77,7 +80,7 @@ internal class SimVirtDriverTest {
var overcommittedWork = 0L
scope.launch {
- val virtDriver = SimVirtDriver(this, SimFairShareHypervisorProvider())
+ val virtDriver = SimHost(UUID.randomUUID(), this, SimFairShareHypervisorProvider())
val vmm = Image(UUID.randomUUID(), "vmm", mapOf("workload" to virtDriver))
val duration = 5 * 60L
val vmImageA = Image(
@@ -122,7 +125,7 @@ internal class SimVirtDriverTest {
virtDriver.events
.onEach { event ->
when (event) {
- is HypervisorEvent.SliceFinished -> {
+ is HostEvent.SliceFinished -> {
requestedWork += event.requestedBurst
grantedWork += event.grantedBurst
overcommittedWork += event.overcommissionedBurst
@@ -131,8 +134,8 @@ internal class SimVirtDriverTest {
}
.launchIn(this)
- virtDriver.spawn("a", vmImageA, flavor)
- virtDriver.spawn("b", vmImageB, flavor)
+ launch { virtDriver.spawn(Server(UUID.randomUUID(), "a", emptyMap(), flavor, vmImageA, ServerState.BUILD, emptyFlow())) }
+ launch { virtDriver.spawn(Server(UUID.randomUUID(), "b", emptyMap(), flavor, vmImageB, ServerState.BUILD, emptyFlow())) }
}
scope.advanceUntilIdle()
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index b941d135..728d6c11 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -37,11 +37,11 @@ import org.opendc.compute.core.ServerEvent
import org.opendc.compute.core.metal.NODE_CLUSTER
import org.opendc.compute.core.metal.NodeEvent
import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.core.virt.HypervisorEvent
+import org.opendc.compute.core.virt.HostEvent
import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import org.opendc.compute.core.workload.VmWorkload
import org.opendc.compute.simulator.SimBareMetalDriver
-import org.opendc.compute.simulator.SimVirtDriver
+import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.SimVirtProvisioningService
import org.opendc.compute.simulator.allocation.AllocationPolicy
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -174,8 +174,8 @@ public suspend fun attachMonitor(
// Monitor hypervisor events
for (hypervisor in hypervisors) {
- // TODO Do not expose VirtDriver directly but use Hypervisor class.
- val server = (hypervisor as SimVirtDriver).node
+ // TODO Do not expose Host directly but use Hypervisor class.
+ val server = (hypervisor as SimHost).node
monitor.reportHostStateChange(clock.millis(), hypervisor, server)
server.events
.onEach { event ->
@@ -190,7 +190,7 @@ public suspend fun attachMonitor(
hypervisor.events
.onEach { event ->
when (event) {
- is HypervisorEvent.SliceFinished -> monitor.reportHostSlice(
+ is HostEvent.SliceFinished -> monitor.reportHostSlice(
clock.millis(),
event.requestedBurst,
event.grantedBurst,
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 04ffd148..8432025b 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -24,7 +24,7 @@ package org.opendc.experiments.capelin.monitor
import org.opendc.compute.core.Server
import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.virt.driver.VirtDriver
+import org.opendc.compute.core.virt.Host
import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import java.io.Closeable
@@ -42,7 +42,7 @@ public interface ExperimentMonitor : Closeable {
*/
public fun reportHostStateChange(
time: Long,
- driver: VirtDriver,
+ driver: Host,
host: Node
) {
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index e8aa5915..2af43701 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -25,7 +25,7 @@ package org.opendc.experiments.capelin.monitor
import mu.KotlinLogging
import org.opendc.compute.core.Server
import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.virt.driver.VirtDriver
+import org.opendc.compute.core.virt.Host
import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import org.opendc.experiments.capelin.telemetry.HostEvent
import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
@@ -64,7 +64,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
override fun reportHostStateChange(
time: Long,
- driver: VirtDriver,
+ driver: Host,
host: Node
) {
logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" }
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 0d6c057f..fca523cd 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -148,9 +148,9 @@ class CapelinIntegrationTest {
assertAll(
{ assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
{ assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
- { assertEquals(1678587333640, monitor.totalRequestedBurst) },
- { assertEquals(438118200924, monitor.totalGrantedBurst) },
- { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) },
+ { assertEquals(1679510908774, monitor.totalRequestedBurst) },
+ { assertEquals(384100282091, monitor.totalGrantedBurst) },
+ { assertEquals(1282152242721, monitor.totalOvercommissionedBurst) },
{ assertEquals(0, monitor.totalInterferedBurst) }
)
}
@@ -195,9 +195,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(705128393966, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(710487768664, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(118846235815, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(584211294239, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
}
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
index b7a26d34..2a41be65 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
@@ -26,7 +26,7 @@ import mu.KotlinLogging
import org.opendc.compute.core.Server
import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.metal.NodeState
-import org.opendc.compute.core.virt.driver.VirtDriver
+import org.opendc.compute.core.virt.Host
import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.telemetry.HostEvent
@@ -51,7 +51,7 @@ public class WebExperimentMonitor : ExperimentMonitor {
override fun reportHostStateChange(
time: Long,
- driver: VirtDriver,
+ driver: Host,
host: Node
) {
logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" }