summaryrefslogtreecommitdiff
path: root/simulator/opendc-compute/opendc-compute-simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-05 14:44:30 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-07 16:13:15 +0100
commita0c0657dc867db61951edff24ddc944bed132ac0 (patch)
treef28f0aa5ef644b99365984de92ac1b2bccb1ff94 /simulator/opendc-compute/opendc-compute-simulator
parent2ba5fc1247472d026f10ad5cf738dcb7e078a9ee (diff)
compute: Make VirtProvisoningService responsible for Server lifecycle
This change refactors the OpenDC Compute module so that the VirtProvisioningService is now responsible for managing the lifecycle of Server objects as opposed to the VirtDriver and BareMetalDriver previously.
Diffstat (limited to 'simulator/opendc-compute/opendc-compute-simulator')
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt300
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt231
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt240
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt8
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt8
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt10
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt (renamed from simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt)15
9 files changed, 455 insertions, 365 deletions
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt
index cf2747cd..a650144b 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt
@@ -23,7 +23,7 @@
package org.opendc.compute.simulator
import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.virt.driver.VirtDriver
+import org.opendc.compute.core.virt.Host
import java.util.UUID
public class HypervisorView(
@@ -33,5 +33,5 @@ public class HypervisorView(
public var availableMemory: Long,
public var provisionedCores: Int
) {
- public lateinit var driver: VirtDriver
+ public lateinit var driver: Host
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
new file mode 100644
index 00000000..85fa2cb6
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -0,0 +1,300 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.Flow
+import mu.KotlinLogging
+import org.opendc.compute.core.*
+import org.opendc.compute.core.Flavor
+import org.opendc.compute.core.metal.Node
+import org.opendc.compute.core.virt.*
+import org.opendc.simulator.compute.*
+import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.workload.SimResourceCommand
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.utils.flow.EventFlow
+import java.util.*
+import kotlin.coroutines.resume
+
+/**
+ * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
+ */
+public class SimHost(
+ override val uid: UUID,
+ private val coroutineScope: CoroutineScope,
+ hypervisor: SimHypervisorProvider
+) : Host, SimWorkload {
+ /**
+ * The logger instance of this server.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The execution context in which the [Host] runs.
+ */
+ private lateinit var ctx: SimExecutionContext
+
+ override val events: Flow<HostEvent>
+ get() = _events
+ internal val _events = EventFlow<HostEvent>()
+
+ /**
+ * The event listeners registered with this host.
+ */
+ private val listeners = mutableListOf<HostListener>()
+
+ /**
+ * Current total memory use of the images on this hypervisor.
+ */
+ private var availableMemory: Long = 0
+
+ /**
+ * The hypervisor to run multiple workloads.
+ */
+ private val hypervisor = hypervisor.create(
+ object : SimHypervisor.Listener {
+ override fun onSliceFinish(
+ hypervisor: SimHypervisor,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ _events.emit(
+ HostEvent.SliceFinished(
+ this@SimHost,
+ requestedWork,
+ grantedWork,
+ overcommittedWork,
+ interferedWork,
+ cpuUsage,
+ cpuDemand,
+ guests.size,
+ node
+ )
+ )
+ }
+ }
+ )
+
+ /**
+ * The virtual machines running on the hypervisor.
+ */
+ private val guests = HashMap<Server, SimGuest>()
+
+ /**
+ * The node on which the hypervisor runs.
+ */
+ public val node: Node
+ get() = ctx.meta["node"] as Node
+
+ override val state: HostState
+ get() = _state
+ private var _state: HostState = HostState.UP
+ set(value) {
+ listeners.forEach { it.onStateChange(this, value) }
+ field = value
+ }
+
+ override fun canFit(server: Server): Boolean {
+ val sufficientMemory = availableMemory > server.flavor.memorySize
+ val enoughCpus = ctx.machine.cpus.size >= server.flavor.cpuCount
+ val canFit = hypervisor.canFit(server.flavor.toMachineModel())
+
+ return sufficientMemory && enoughCpus && canFit
+ }
+
+ override suspend fun spawn(server: Server, start: Boolean) {
+ // Return if the server already exists on this host
+ if (server in this) {
+ return
+ }
+
+ require(canFit(server)) { "Server does not fit" }
+ val guest = SimGuest(server, hypervisor.createMachine(server.flavor.toMachineModel()))
+ guests[server] = guest
+
+ if (start) {
+ guest.start()
+ }
+
+ _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory))
+ }
+
+ override fun contains(server: Server): Boolean {
+ return server in guests
+ }
+
+ override suspend fun start(server: Server) {
+ val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" }
+ guest.start()
+ }
+
+ override suspend fun stop(server: Server) {
+ val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" }
+ guest.stop()
+ }
+
+ override suspend fun terminate(server: Server) {
+ val guest = guests.remove(server) ?: return
+ guest.terminate()
+ }
+
+ override fun addListener(listener: HostListener) {
+ listeners.add(listener)
+ }
+
+ override fun removeListener(listener: HostListener) {
+ listeners.remove(listener)
+ }
+
+ /**
+ * Convert flavor to machine model.
+ */
+ private fun Flavor.toMachineModel(): SimMachineModel {
+ val originalCpu = ctx.machine.cpus[0]
+ val processingNode = originalCpu.node.copy(coreCount = cpuCount)
+ val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
+ val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
+
+ return SimMachineModel(processingUnits, memoryUnits)
+ }
+
+ private fun onGuestStart(vm: SimGuest) {
+ guests.forEach { _, guest ->
+ if (guest.state == ServerState.ACTIVE) {
+ vm.performanceInterferenceModel?.onStart(vm.server.image.name)
+ }
+ }
+
+ listeners.forEach { it.onStateChange(this, vm.server, vm.state) }
+ }
+
+ private fun onGuestStop(vm: SimGuest) {
+ guests.forEach { _, guest ->
+ if (guest.state == ServerState.ACTIVE) {
+ vm.performanceInterferenceModel?.onStop(vm.server.image.name)
+ }
+ }
+
+ listeners.forEach { it.onStateChange(this, vm.server, vm.state) }
+
+ _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory))
+ }
+
+ /**
+ * A virtual machine instance that the driver manages.
+ */
+ private inner class SimGuest(val server: Server, val machine: SimMachine) {
+ val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+
+ var state: ServerState = ServerState.SHUTOFF
+
+ suspend fun start() {
+ when (state) {
+ ServerState.SHUTOFF -> {
+ logger.info { "User requested to start server ${server.uid}" }
+ launch()
+ }
+ ServerState.ACTIVE -> return
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ suspend fun stop() {
+ when (state) {
+ ServerState.ACTIVE, ServerState.ERROR -> {
+ val job = job ?: throw IllegalStateException("Server should be active")
+ job.cancel()
+ job.join()
+ }
+ ServerState.SHUTOFF -> return
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ suspend fun terminate() {
+ stop()
+ }
+
+ private var job: Job? = null
+
+ private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont ->
+ assert(job == null) { "Concurrent job running" }
+ val workload = server.image.tags["workload"] as SimWorkload
+
+ val job = coroutineScope.launch {
+ delay(1) // TODO Introduce boot time
+ init()
+ cont.resume(Unit)
+ try {
+ machine.run(workload, mapOf("driver" to this@SimHost, "server" to server))
+ exit(null)
+ } catch (cause: Throwable) {
+ exit(cause)
+ } finally {
+ machine.close()
+ }
+ }
+ this.job = job
+ job.invokeOnCompletion {
+ this.job = null
+ }
+ }
+
+ private fun init() {
+ state = ServerState.ACTIVE
+ onGuestStart(this)
+ }
+
+ private fun exit(cause: Throwable?) {
+ state =
+ if (cause == null)
+ ServerState.SHUTOFF
+ else
+ ServerState.ERROR
+
+ availableMemory += server.flavor.memorySize
+ onGuestStop(this)
+ }
+ }
+
+ override fun onStart(ctx: SimExecutionContext) {
+ this.ctx = ctx
+ this.availableMemory = ctx.machine.memory.map { it.size }.sum()
+ this.hypervisor.onStart(ctx)
+ }
+
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ return hypervisor.onStart(ctx, cpu)
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ return hypervisor.onNext(ctx, cpu, remainingWork)
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
deleted file mode 100644
index 35d82211..00000000
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.launch
-import org.opendc.compute.core.*
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.image.Image
-import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.virt.HypervisorEvent
-import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException
-import org.opendc.compute.core.virt.driver.VirtDriver
-import org.opendc.simulator.compute.*
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.workload.SimResourceCommand
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.utils.flow.EventFlow
-import java.util.*
-
-/**
- * A [VirtDriver] that is simulates virtual machines on a physical machine using [SimHypervisor].
- */
-public class SimVirtDriver(
- private val coroutineScope: CoroutineScope,
- hypervisor: SimHypervisorProvider
-) : VirtDriver, SimWorkload {
- /**
- * The execution context in which the [VirtDriver] runs.
- */
- private lateinit var ctx: SimExecutionContext
-
- /**
- * The node on which the hypervisor runs.
- */
- public val node: Node
- get() = ctx.meta["node"] as Node
-
- /**
- * The [EventFlow] to emit the events.
- */
- internal val eventFlow = EventFlow<HypervisorEvent>()
-
- override val events: Flow<HypervisorEvent> = eventFlow
-
- /**
- * Current total memory use of the images on this hypervisor.
- */
- private var availableMemory: Long = 0
-
- /**
- * The hypervisor to run multiple workloads.
- */
- private val hypervisor = hypervisor.create(
- object : SimHypervisor.Listener {
- override fun onSliceFinish(
- hypervisor: SimHypervisor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- eventFlow.emit(
- HypervisorEvent.SliceFinished(
- this@SimVirtDriver,
- requestedWork,
- grantedWork,
- overcommittedWork,
- interferedWork,
- cpuUsage,
- cpuDemand,
- vms.size,
- node
- )
- )
- }
- }
- )
-
- /**
- * The virtual machines running on the hypervisor.
- */
- private val vms = HashSet<VirtualMachine>()
-
- override fun canFit(flavor: Flavor): Boolean {
- val sufficientMemory = availableMemory > flavor.memorySize
- val enoughCpus = ctx.machine.cpus.size >= flavor.cpuCount
- val canFit = hypervisor.canFit(flavor.toMachineModel())
-
- return sufficientMemory && enoughCpus && canFit
- }
-
- override suspend fun spawn(name: String, image: Image, flavor: Flavor): Server {
- val requiredMemory = flavor.memorySize
- if (availableMemory - requiredMemory < 0) {
- throw InsufficientMemoryOnServerException()
- }
- require(flavor.cpuCount <= ctx.machine.cpus.size) { "Machine does not fit" }
-
- val events = EventFlow<ServerEvent>()
- val server = Server(
- UUID.randomUUID(),
- name,
- emptyMap(),
- flavor,
- image,
- ServerState.BUILD,
- events
- )
- availableMemory -= requiredMemory
-
- val vm = VirtualMachine(server, events, hypervisor.createMachine(flavor.toMachineModel()))
- vms.add(vm)
- vmStarted(vm)
- eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
- return server
- }
-
- /**
- * Convert flavor to machine model.
- */
- private fun Flavor.toMachineModel(): SimMachineModel {
- val originalCpu = ctx.machine.cpus[0]
- val processingNode = originalCpu.node.copy(coreCount = cpuCount)
- val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
- val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
-
- return SimMachineModel(processingUnits, memoryUnits)
- }
-
- private fun vmStarted(vm: VirtualMachine) {
- vms.forEach {
- vm.performanceInterferenceModel?.onStart(it.server.image.name)
- }
- }
-
- private fun vmStopped(vm: VirtualMachine) {
- vms.forEach {
- vm.performanceInterferenceModel?.onStop(it.server.image.name)
- }
- }
-
- /**
- * A virtual machine instance that the driver manages.
- */
- private inner class VirtualMachine(server: Server, val events: EventFlow<ServerEvent>, val machine: SimMachine) {
- val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
-
- val job = coroutineScope.launch {
- val workload = server.image.tags["workload"] as SimWorkload
-
- delay(1) // TODO Introduce boot time
- init()
- try {
- machine.run(workload, mapOf("driver" to this@SimVirtDriver, "server" to server))
- exit(null)
- } catch (cause: Throwable) {
- exit(cause)
- } finally {
- machine.close()
- }
- }
-
- var server: Server = server
- set(value) {
- if (field.state != value.state) {
- events.emit(ServerEvent.StateChanged(value, field.state))
- }
-
- field = value
- }
-
- private fun init() {
- server = server.copy(state = ServerState.ACTIVE)
- }
-
- private fun exit(cause: Throwable?) {
- val serverState =
- if (cause == null)
- ServerState.SHUTOFF
- else
- ServerState.ERROR
- server = server.copy(state = serverState)
- availableMemory += server.flavor.memorySize
- vms.remove(this)
- vmStopped(this)
- eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimVirtDriver, vms.size, availableMemory))
- events.close()
- }
- }
-
- override fun onStart(ctx: SimExecutionContext) {
- this.ctx = ctx
- this.availableMemory = ctx.machine.memory.map { it.size }.sum()
- this.hypervisor.onStart(ctx)
- }
-
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return hypervisor.onStart(ctx, cpu)
- }
-
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- return hypervisor.onNext(ctx, cpu, remainingWork)
- }
-}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
index 18afd0c2..ee747a9a 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
@@ -36,9 +36,7 @@ import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.metal.NodeEvent
import org.opendc.compute.core.metal.NodeState
import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.core.virt.HypervisorEvent
-import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException
-import org.opendc.compute.core.virt.driver.VirtDriver
+import org.opendc.compute.core.virt.*
import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import org.opendc.compute.core.virt.service.VirtProvisioningService
import org.opendc.compute.core.virt.service.events.*
@@ -62,13 +60,18 @@ public class SimVirtProvisioningService(
private val tracer: EventTracer,
private val hypervisor: SimHypervisorProvider,
private val schedulingQuantum: Long = 300000, // 5 minutes in milliseconds
-) : VirtProvisioningService {
+) : VirtProvisioningService, HostListener {
/**
* The logger instance to use.
*/
private val logger = KotlinLogging.logger {}
/**
+ * A mapping from host to hypervisor view.
+ */
+ private val hostToHv = mutableMapOf<Host, HypervisorView>()
+
+ /**
* The hypervisors that have been launched by the service.
*/
private val hypervisors: MutableMap<Node, HypervisorView> = mutableMapOf()
@@ -79,14 +82,19 @@ public class SimVirtProvisioningService(
private val availableHypervisors: MutableSet<HypervisorView> = mutableSetOf()
/**
- * The incoming images to be processed by the provisioner.
+ * The servers that should be launched by the service.
+ */
+ private val queue: Deque<LaunchRequest> = ArrayDeque()
+
+ /**
+ * The active servers in the system.
*/
- private val incomingImages: Deque<ImageView> = ArrayDeque()
+ private val activeServers: MutableSet<Server> = mutableSetOf()
/**
- * The active images in the system.
+ * The [Random] instance used to generate unique identifiers for the objects.
*/
- private val activeImages: MutableSet<ImageView> = mutableSetOf()
+ private val random = Random(0)
public var submittedVms: Int = 0
public var queuedVms: Int = 0
@@ -102,12 +110,9 @@ public class SimVirtProvisioningService(
*/
private val allocationLogic = allocationPolicy()
- /**
- * The [EventFlow] to emit the events.
- */
- internal val eventFlow = EventFlow<VirtProvisioningEvent>()
-
- override val events: Flow<VirtProvisioningEvent> = eventFlow
+ override val events: Flow<VirtProvisioningEvent>
+ get() = _events
+ private val _events = EventFlow<VirtProvisioningEvent>()
/**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
@@ -118,7 +123,8 @@ public class SimVirtProvisioningService(
coroutineScope.launch {
val provisionedNodes = provisioningService.nodes()
provisionedNodes.forEach { node ->
- val workload = SimVirtDriver(coroutineScope, hypervisor)
+ val workload = SimHost(UUID(random.nextLong(), random.nextLong()), coroutineScope, hypervisor)
+ workload.addListener(this@SimVirtProvisioningService)
val hypervisorImage = Image(UUID.randomUUID(), "vmm", mapOf("workload" to workload))
launch {
val deployedNode = provisioningService.deploy(node, hypervisorImage)
@@ -132,7 +138,7 @@ public class SimVirtProvisioningService(
}
}
- override suspend fun drivers(): Set<VirtDriver> {
+ override suspend fun drivers(): Set<Host> {
return availableHypervisors.map { it.driver }.toSet()
}
@@ -145,7 +151,7 @@ public class SimVirtProvisioningService(
): Server {
tracer.commit(VmSubmissionEvent(name, image, flavor))
- eventFlow.emit(
+ _events.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
hypervisors.size,
@@ -158,9 +164,9 @@ public class SimVirtProvisioningService(
)
)
- return suspendCancellableCoroutine<Server> { cont ->
- val vmInstance = ImageView(name, image, flavor, cont)
- incomingImages += vmInstance
+ return suspendCancellableCoroutine { cont ->
+ val request = LaunchRequest(createServer(name, image, flavor), cont)
+ queue += request
requestCycle()
}
}
@@ -170,6 +176,22 @@ public class SimVirtProvisioningService(
provisionedNodes.forEach { node -> provisioningService.stop(node) }
}
+ private fun createServer(
+ name: String,
+ image: Image,
+ flavor: Flavor
+ ): Server {
+ return Server(
+ uid = UUID(random.nextLong(), random.nextLong()),
+ name = name,
+ tags = emptyMap(),
+ flavor = flavor,
+ image = image,
+ state = ServerState.BUILD,
+ events = EventFlow()
+ )
+ }
+
private fun requestCycle() {
// Bail out in case we have already requested a new cycle.
if (scheduler.isTimerActive(Unit)) {
@@ -182,23 +204,23 @@ public class SimVirtProvisioningService(
val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
scheduler.startSingleTimer(Unit, delay) {
- coroutineScope.launch { schedule() }
+ schedule()
}
}
- private suspend fun schedule() {
- while (incomingImages.isNotEmpty()) {
- val imageInstance = incomingImages.peekFirst()
- val requiredMemory = imageInstance.flavor.memorySize
- val selectedHv = allocationLogic.select(availableHypervisors, imageInstance)
+ private fun schedule() {
+ while (queue.isNotEmpty()) {
+ val (server, cont) = queue.peekFirst()
+ val requiredMemory = server.flavor.memorySize
+ val selectedHv = allocationLogic.select(availableHypervisors, server)
- if (selectedHv == null || !selectedHv.driver.canFit(imageInstance.flavor)) {
- logger.trace { "Image ${imageInstance.image} selected for scheduling but no capacity available for it." }
+ if (selectedHv == null || !selectedHv.driver.canFit(server)) {
+ logger.trace { "Server $server selected for scheduling but no capacity available for it." }
- if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
- tracer.commit(VmSubmissionInvalidEvent(imageInstance.name))
+ if (requiredMemory > maxMemory || server.flavor.cpuCount > maxCores) {
+ tracer.commit(VmSubmissionInvalidEvent(server.name))
- eventFlow.emit(
+ _events.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
hypervisors.size,
@@ -212,9 +234,9 @@ public class SimVirtProvisioningService(
)
// Remove the incoming image
- incomingImages.poll()
+ queue.poll()
- logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]")
+ logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
continue
} else {
break
@@ -222,86 +244,49 @@ public class SimVirtProvisioningService(
}
try {
- logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.node.uid} ${selectedHv.node.name} ${selectedHv.node.flavor}" }
- incomingImages.poll()
+ logger.info { "[${clock.millis()}] Spawning $server on ${selectedHv.node.uid} ${selectedHv.node.name} ${selectedHv.node.flavor}" }
+ queue.poll()
// Speculatively update the hypervisor view information to prevent other images in the queue from
// deciding on stale values.
selectedHv.numberOfActiveServers++
- selectedHv.provisionedCores += imageInstance.flavor.cpuCount
+ selectedHv.provisionedCores += server.flavor.cpuCount
selectedHv.availableMemory -= requiredMemory // XXX Temporary hack
- val server = selectedHv.driver.spawn(
- imageInstance.name,
- imageInstance.image,
- imageInstance.flavor
- )
- imageInstance.server = server
- imageInstance.continuation.resume(server)
-
- tracer.commit(VmScheduledEvent(imageInstance.name))
-
- eventFlow.emit(
- VirtProvisioningEvent.MetricsAvailable(
- this@SimVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- ++runningVms,
- finishedVms,
- --queuedVms,
- unscheduledVms
- )
- )
- activeImages += imageInstance
+ coroutineScope.launch {
+ try {
+ cont.resume(server)
+ selectedHv.driver.spawn(server)
+ activeServers += server
+
+ tracer.commit(VmScheduledEvent(server.name))
+ _events.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ ++runningVms,
+ finishedVms,
+ --queuedVms,
+ unscheduledVms
+ )
+ )
+ } catch (e: InsufficientMemoryOnServerException) {
+ logger.error("Failed to deploy VM", e)
- server.events
- .onEach { event ->
- when (event) {
- is ServerEvent.StateChanged -> {
- if (event.server.state == ServerState.SHUTOFF) {
- logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
-
- tracer.commit(VmStoppedEvent(event.server.name))
-
- eventFlow.emit(
- VirtProvisioningEvent.MetricsAvailable(
- this@SimVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- --runningVms,
- ++finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
-
- activeImages -= imageInstance
- selectedHv.provisionedCores -= server.flavor.cpuCount
-
- // Try to reschedule if needed
- if (incomingImages.isNotEmpty()) {
- requestCycle()
- }
- }
- }
- }
+ selectedHv.numberOfActiveServers--
+ selectedHv.provisionedCores -= server.flavor.cpuCount
+ selectedHv.availableMemory += requiredMemory
}
- .launchIn(coroutineScope)
- } catch (e: InsufficientMemoryOnServerException) {
- logger.error("Failed to deploy VM", e)
-
- selectedHv.numberOfActiveServers--
- selectedHv.provisionedCores -= imageInstance.flavor.cpuCount
- selectedHv.availableMemory += requiredMemory
+ }
} catch (e: Throwable) {
logger.error("Failed to deploy VM", e)
}
}
}
- private fun stateChanged(node: Node, hypervisor: SimVirtDriver) {
+ private fun stateChanged(node: Node, hypervisor: SimHost) {
when (node.state) {
NodeState.ACTIVE -> {
logger.debug { "[${clock.millis()}] Server ${node.uid} available: ${node.state}" }
@@ -320,7 +305,7 @@ public class SimVirtProvisioningService(
hv.driver = hypervisor
hv.driver.events
.onEach { event ->
- if (event is HypervisorEvent.VmsUpdated) {
+ if (event is HostEvent.VmsUpdated) {
hv.numberOfActiveServers = event.numberOfActiveServers
hv.availableMemory = event.availableMemory
}
@@ -329,12 +314,13 @@ public class SimVirtProvisioningService(
maxCores = max(maxCores, node.flavor.cpuCount)
maxMemory = max(maxMemory, node.flavor.memorySize)
hypervisors[node] = hv
+ hostToHv[hypervisor] = hv
availableHypervisors += hv
}
tracer.commit(HypervisorAvailableEvent(node.uid))
- eventFlow.emit(
+ _events.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
hypervisors.size,
@@ -348,7 +334,7 @@ public class SimVirtProvisioningService(
)
// Re-schedule on the new machine
- if (incomingImages.isNotEmpty()) {
+ if (queue.isNotEmpty()) {
requestCycle()
}
}
@@ -359,7 +345,7 @@ public class SimVirtProvisioningService(
tracer.commit(HypervisorUnavailableEvent(hv.uid))
- eventFlow.emit(
+ _events.emit(
VirtProvisioningEvent.MetricsAvailable(
this@SimVirtProvisioningService,
hypervisors.size,
@@ -372,7 +358,7 @@ public class SimVirtProvisioningService(
)
)
- if (incomingImages.isNotEmpty()) {
+ if (queue.isNotEmpty()) {
requestCycle()
}
}
@@ -380,11 +366,43 @@ public class SimVirtProvisioningService(
}
}
- public data class ImageView(
- public val name: String,
- public val image: Image,
- public val flavor: Flavor,
- public val continuation: Continuation<Server>,
- public var server: Server? = null
- )
+ override fun onStateChange(host: Host, server: Server, newState: ServerState) {
+ val eventFlow = server.events as EventFlow<ServerEvent>
+ val newServer = server.copy(state = newState)
+ eventFlow.emit(ServerEvent.StateChanged(newServer, server.state))
+
+ if (newState == ServerState.SHUTOFF) {
+ logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
+
+ tracer.commit(VmStoppedEvent(server.name))
+
+ _events.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ --runningVms,
+ ++finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
+
+ activeServers -= server
+ val hv = hostToHv[host]
+ if (hv != null) {
+ hv.provisionedCores -= server.flavor.cpuCount
+ } else {
+ logger.error { "Unknown host $host" }
+ }
+
+ // Try to reschedule if needed
+ if (queue.isNotEmpty()) {
+ requestCycle()
+ }
+ }
+ }
+
+ public data class LaunchRequest(val server: Server, val cont: Continuation<Server>)
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt
index 2018b9f2..3099f1ad 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt
@@ -22,9 +22,9 @@
package org.opendc.compute.simulator.allocation
+import org.opendc.compute.core.Server
import org.opendc.compute.core.metal.Node
import org.opendc.compute.simulator.HypervisorView
-import org.opendc.compute.simulator.SimVirtProvisioningService
/**
* A policy for selecting the [Node] an image should be deployed to,
@@ -39,7 +39,7 @@ public interface AllocationPolicy {
*/
public fun select(
hypervisors: Set<HypervisorView>,
- image: SimVirtProvisioningService.ImageView
+ server: Server
): HypervisorView?
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt
index 04a181a6..df48f405 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt
@@ -22,8 +22,8 @@
package org.opendc.compute.simulator.allocation
+import org.opendc.compute.core.Server
import org.opendc.compute.simulator.HypervisorView
-import org.opendc.compute.simulator.SimVirtProvisioningService
/**
* The logic for an [AllocationPolicy] that uses a [Comparator] to select the appropriate node.
@@ -36,12 +36,12 @@ public interface ComparableAllocationPolicyLogic : AllocationPolicy.Logic {
override fun select(
hypervisors: Set<HypervisorView>,
- image: SimVirtProvisioningService.ImageView
+ server: Server
): HypervisorView? {
return hypervisors.asSequence()
.filter { hv ->
- val fitsMemory = hv.availableMemory >= (image.flavor.memorySize)
- val fitsCpu = hv.node.flavor.cpuCount >= image.flavor.cpuCount
+ val fitsMemory = hv.availableMemory >= (server.flavor.memorySize)
+ val fitsCpu = hv.node.flavor.cpuCount >= server.flavor.cpuCount
fitsMemory && fitsCpu
}
.minWithOrNull(comparator.thenBy { it.node.uid })
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt
index 9a89fccd..03706b42 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt
@@ -22,8 +22,8 @@
package org.opendc.compute.simulator.allocation
+import org.opendc.compute.core.Server
import org.opendc.compute.simulator.HypervisorView
-import org.opendc.compute.simulator.SimVirtProvisioningService
import kotlin.random.Random
/**
@@ -34,12 +34,12 @@ public class RandomAllocationPolicy(private val random: Random = Random(0)) : Al
override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic {
override fun select(
hypervisors: Set<HypervisorView>,
- image: SimVirtProvisioningService.ImageView
+ server: Server
): HypervisorView? {
return hypervisors.asIterable()
.filter { hv ->
- val fitsMemory = hv.availableMemory >= (image.image.tags["required-memory"] as Long)
- val fitsCpu = hv.node.flavor.cpuCount >= image.flavor.cpuCount
+ val fitsMemory = hv.availableMemory >= (server.image.tags["required-memory"] as Long)
+ val fitsCpu = hv.node.flavor.cpuCount >= server.flavor.cpuCount
fitsMemory && fitsCpu
}
.randomOrNull(random)
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt
index 582fe817..30065621 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt
@@ -23,8 +23,8 @@
package org.opendc.compute.simulator.allocation
import mu.KotlinLogging
+import org.opendc.compute.core.Server
import org.opendc.compute.simulator.HypervisorView
-import org.opendc.compute.simulator.SimVirtProvisioningService
private val logger = KotlinLogging.logger {}
@@ -38,14 +38,14 @@ public class ReplayAllocationPolicy(private val vmPlacements: Map<String, String
override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic {
override fun select(
hypervisors: Set<HypervisorView>,
- image: SimVirtProvisioningService.ImageView
+ server: Server
): HypervisorView? {
- val clusterName = vmPlacements[image.name]
- ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${image.name}")
+ val clusterName = vmPlacements[server.name]
+ ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${server.name}")
val machinesInCluster = hypervisors.filter { it.node.name.contains(clusterName) }
if (machinesInCluster.isEmpty()) {
- logger.info { "Could not find any machines belonging to cluster $clusterName for image ${image.name}, assigning randomly." }
+ logger.info { "Could not find any machines belonging to cluster $clusterName for image ${server.name}, assigning randomly." }
return hypervisors.maxByOrNull { it.availableMemory }
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 6b754572..83e891cb 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -24,6 +24,7 @@ package org.opendc.compute.simulator
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
@@ -33,8 +34,10 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.core.Flavor
+import org.opendc.compute.core.Server
+import org.opendc.compute.core.ServerState
import org.opendc.compute.core.image.Image
-import org.opendc.compute.core.virt.HypervisorEvent
+import org.opendc.compute.core.virt.HostEvent
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -49,7 +52,7 @@ import java.util.UUID
* Basic test-suite for the hypervisor.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-internal class SimVirtDriverTest {
+internal class SimHostTest {
private lateinit var scope: TestCoroutineScope
private lateinit var clock: Clock
private lateinit var machineModel: SimMachineModel
@@ -77,7 +80,7 @@ internal class SimVirtDriverTest {
var overcommittedWork = 0L
scope.launch {
- val virtDriver = SimVirtDriver(this, SimFairShareHypervisorProvider())
+ val virtDriver = SimHost(UUID.randomUUID(), this, SimFairShareHypervisorProvider())
val vmm = Image(UUID.randomUUID(), "vmm", mapOf("workload" to virtDriver))
val duration = 5 * 60L
val vmImageA = Image(
@@ -122,7 +125,7 @@ internal class SimVirtDriverTest {
virtDriver.events
.onEach { event ->
when (event) {
- is HypervisorEvent.SliceFinished -> {
+ is HostEvent.SliceFinished -> {
requestedWork += event.requestedBurst
grantedWork += event.grantedBurst
overcommittedWork += event.overcommissionedBurst
@@ -131,8 +134,8 @@ internal class SimVirtDriverTest {
}
.launchIn(this)
- virtDriver.spawn("a", vmImageA, flavor)
- virtDriver.spawn("b", vmImageB, flavor)
+ launch { virtDriver.spawn(Server(UUID.randomUUID(), "a", emptyMap(), flavor, vmImageA, ServerState.BUILD, emptyFlow())) }
+ launch { virtDriver.spawn(Server(UUID.randomUUID(), "b", emptyMap(), flavor, vmImageB, ServerState.BUILD, emptyFlow())) }
}
scope.advanceUntilIdle()