summaryrefslogtreecommitdiff
path: root/opendc/opendc-compute
diff options
context:
space:
mode:
Diffstat (limited to 'opendc/opendc-compute')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt4
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt5
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt72
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt58
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt)14
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt)5
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt)2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt)26
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt3
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt36
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt3
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt7
13 files changed, 168 insertions, 69 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index c8caaca6..e0a491c8 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -30,7 +30,7 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.services.ServiceKey
/**
- * Represents the execution context in which an bootable [Image] runs on a [Server].
+ * Represents the execution context in which a bootable [Image] runs on a [Server].
*/
public interface ServerContext {
/**
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
index 8b8d1596..7cb4c0c5 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
@@ -33,7 +33,7 @@ import java.util.UUID
/**
* A bare-metal compute node.
*/
-data class Node(
+public data class Node(
/**
* The unique identifier of the node.
*/
@@ -45,7 +45,7 @@ data class Node(
public override val name: String,
/**
- * Meta data of the node.
+ * Metadata of the node.
*/
public val metadata: Map<String, Any>,
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
index 5d1db378..41cec291 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
@@ -38,6 +38,11 @@ import java.util.UUID
*/
public interface BareMetalDriver : Powerable, FailureDomain {
/**
+ * The [Node] that is controlled by this driver.
+ */
+ public val node: Flow<Node>
+
+ /**
* The amount of work done by the machine in percentage with respect to the total amount of processing power
* available.
*/
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 49c3fa2e..67069c03 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -48,10 +48,13 @@ import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.scanReduce
import kotlinx.coroutines.launch
import java.util.UUID
import kotlin.math.ceil
@@ -96,33 +99,40 @@ public class SimpleBareMetalDriver(
/**
* The flow containing the load of the server.
*/
- private val usageSignal = StateFlow(0.0)
+ private val usageState = StateFlow(0.0)
/**
* The machine state.
*/
- private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events)
- set(value) {
+ private val nodeState = StateFlow(Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events))
+
+ override val node: Flow<Node> = nodeState
+
+ override val usage: Flow<Double> = usageState
+
+ override val powerDraw: Flow<Double> = powerModel(this)
+
+ init {
+ @OptIn(ExperimentalCoroutinesApi::class)
+ nodeState.scanReduce { field, value ->
if (field.state != value.state) {
events.emit(NodeEvent.StateChanged(value, field.state))
}
- if (field.server != null && value.server != null && field.server!!.state != value.server.state) {
- serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server!!.state))
+ if (field.server != null && value.server != null && field.server.state != value.server.state) {
+ serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state))
}
- field = value
- }
-
- override val usage: Flow<Double> = usageSignal
-
- override val powerDraw: Flow<Double> = powerModel(this)
+ value
+ }.launchIn(domain)
+ }
override suspend fun init(): Node = withContext(domain.coroutineContext) {
- node
+ nodeState.value
}
override suspend fun start(): Node = withContext(domain.coroutineContext) {
+ val node = nodeState.value
if (node.state != NodeState.SHUTOFF) {
return@withContext node
}
@@ -139,12 +149,13 @@ public class SimpleBareMetalDriver(
events
)
- node = node.copy(state = NodeState.BOOT, server = server)
+ nodeState.value = node.copy(state = NodeState.BOOT, server = server)
serverContext = BareMetalServerContext(events)
- return@withContext node
+ return@withContext nodeState.value
}
override suspend fun stop(): Node = withContext(domain.coroutineContext) {
+ val node = nodeState.value
if (node.state == NodeState.SHUTOFF) {
return@withContext node
}
@@ -153,7 +164,7 @@ public class SimpleBareMetalDriver(
serverContext!!.cancel(fail = false)
serverContext = null
- node = node.copy(state = NodeState.SHUTOFF, server = null)
+ nodeState.value = node.copy(state = NodeState.SHUTOFF, server = null)
return@withContext node
}
@@ -163,11 +174,11 @@ public class SimpleBareMetalDriver(
}
override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) {
- node = node.copy(image = image)
- return@withContext node
+ nodeState.value = nodeState.value.copy(image = image)
+ return@withContext nodeState.value
}
- override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node }
+ override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value }
private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext {
private var finalized: Boolean = false
@@ -175,7 +186,7 @@ public class SimpleBareMetalDriver(
override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus
override val server: Server
- get() = node.server!!
+ get() = nodeState.value.server!!
private val job = domain.launch {
delay(1) // TODO Introduce boot time
@@ -193,15 +204,15 @@ public class SimpleBareMetalDriver(
*/
suspend fun cancel(fail: Boolean) {
if (fail)
- domain.cancel(ShutdownException(cause = Exception("Random failure")))
+ job.cancel(ShutdownException(cause = Exception("Random failure")))
else
- domain.cancel(ShutdownException())
+ job.cancel(ShutdownException())
job.join()
}
override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) {
val server = server.copy(services = server.services.put(key, service))
- node = node.copy(server = server)
+ nodeState.value = nodeState.value.copy(server = server)
events.emit(ServerEvent.ServicePublished(server, key))
}
@@ -209,24 +220,24 @@ public class SimpleBareMetalDriver(
assert(!finalized) { "Machine is already finalized" }
val server = server.copy(state = ServerState.ACTIVE)
- node = node.copy(state = NodeState.ACTIVE, server = server)
+ nodeState.value = nodeState.value.copy(state = NodeState.ACTIVE, server = server)
}
override suspend fun exit(cause: Throwable?) {
finalized = true
- val serverState =
+ val newServerState =
if (cause == null || (cause is ShutdownException && cause.cause == null))
ServerState.SHUTOFF
else
ServerState.ERROR
- val nodeState =
+ val newNodeState =
if (cause == null || (cause is ShutdownException && cause.cause != null))
- node.state
+ nodeState.value.state
else
NodeState.ERROR
- val server = server.copy(state = serverState)
- node = node.copy(state = nodeState, server = server)
+ val server = server.copy(state = newServerState)
+ nodeState.value = nodeState.value.copy(state = newNodeState, server = server)
}
private var flush: Job? = null
@@ -256,7 +267,7 @@ public class SimpleBareMetalDriver(
}
}
- usageSignal.value = totalUsage / cpus.size
+ usageState.value = totalUsage / cpus.size
try {
delay(duration)
@@ -269,7 +280,7 @@ public class SimpleBareMetalDriver(
// Flush the load if the do not receive a new run call for the same timestamp
flush = domain.launch(job) {
delay(1)
- usageSignal.value = 0.0
+ usageState.value = 0.0
}
flush!!.invokeOnCompletion {
flush = null
@@ -289,5 +300,6 @@ public class SimpleBareMetalDriver(
override suspend fun fail() {
serverContext?.cancel(fail = true)
+ domain.cancel()
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
new file mode 100644
index 00000000..69b0124d
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
@@ -0,0 +1,58 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.virt
+
+import com.atlarge.opendc.core.Identity
+import kotlinx.coroutines.flow.Flow
+import java.util.UUID
+
+/**
+ * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment
+ * into several virtual guest machines.
+ */
+public class Hypervisor(
+ /**
+ * The unique identifier of the hypervisor.
+ */
+ override val uid: UUID,
+
+ /**
+ * The optional name of the hypervisor.
+ */
+ override val name: String,
+
+ /**
+ * Metadata of the hypervisor.
+ */
+ public val metadata: Map<String, Any>,
+
+ /**
+ * The events that are emitted by the hypervisor.
+ */
+ public val events: Flow<HypervisorEvent>
+) : Identity {
+ override fun hashCode(): Int = uid.hashCode()
+ override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
index ccbe8b3c..3230c2ba 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
@@ -22,12 +22,14 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.virt.driver
+package com.atlarge.opendc.compute.virt
+
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
/**
* An event that is emitted by a [VirtDriver].
*/
-public sealed class VirtDriverEvent {
+public sealed class HypervisorEvent {
/**
* The driver that emitted the event.
*/
@@ -40,7 +42,11 @@ public sealed class VirtDriverEvent {
* @property numberOfActiveServers The number of active servers.
* @property availableMemory The available memory, in MB.
*/
- public data class VmsUpdated(override val driver: VirtDriver, public val numberOfActiveServers: Int, public val availableMemory: Long) : VirtDriverEvent()
+ public data class VmsUpdated(
+ override val driver: VirtDriver,
+ public val numberOfActiveServers: Int,
+ public val availableMemory: Long
+ ) : HypervisorEvent()
/**
* This event is emitted when a slice is finished.
@@ -55,5 +61,5 @@ public sealed class VirtDriverEvent {
public val requestedBurst: Long,
public val grantedBurst: Long,
public val numberOfDeployedImages: Int
- ) : VirtDriverEvent()
+ ) : HypervisorEvent()
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
index 1eb0e0ff..c21b002d 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
@@ -22,10 +22,11 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.virt.driver.hypervisor
+package com.atlarge.opendc.compute.virt
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.image.Image
+import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.core.resource.TagContainer
import kotlinx.coroutines.coroutineScope
@@ -42,7 +43,7 @@ object HypervisorImage : Image {
override suspend fun invoke(ctx: ServerContext) {
coroutineScope {
- val driver = HypervisorVirtDriver(ctx, this)
+ val driver = SimpleVirtDriver(ctx, this)
ctx.publishService(VirtDriver.Key, driver)
// Suspend image until it is cancelled
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt
index 926234b5..0586ae00 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt
@@ -1,3 +1,3 @@
-package com.atlarge.opendc.compute.virt.driver.hypervisor
+package com.atlarge.opendc.compute.virt.driver
public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.")
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 0b4a7109..fc4c7634 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.virt.driver.hypervisor
+package com.atlarge.opendc.compute.virt.driver
import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.flow.EventFlow
@@ -37,8 +37,7 @@ import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.execution.ShutdownException
import com.atlarge.opendc.compute.core.execution.assertFailure
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.driver.VirtDriverEvent
+import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
@@ -62,7 +61,7 @@ import kotlin.math.min
* A [VirtDriver] that is backed by a simple hypervisor implementation.
*/
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
-class HypervisorVirtDriver(
+class SimpleVirtDriver(
private val hostContext: ServerContext,
private val coroutineScope: CoroutineScope
) : VirtDriver {
@@ -85,9 +84,9 @@ class HypervisorVirtDriver(
/**
* The [EventFlow] to emit the events.
*/
- internal val eventFlow = EventFlow<VirtDriverEvent>()
+ internal val eventFlow = EventFlow<HypervisorEvent>()
- override val events: Flow<VirtDriverEvent> = eventFlow
+ override val events: Flow<HypervisorEvent> = eventFlow
override suspend fun spawn(
image: Image,
@@ -106,7 +105,7 @@ class HypervisorVirtDriver(
)
availableMemory -= requiredMemory
vms.add(VmServerContext(server, events, simulationContext.domain))
- eventFlow.emit(VirtDriverEvent.VmsUpdated(this, vms.size, availableMemory))
+ eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
return server
}
@@ -223,7 +222,7 @@ class HypervisorVirtDriver(
}
}
- eventFlow.emit(VirtDriverEvent.SliceFinished(this@HypervisorVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size))
+ eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size))
}
this.call = call
}
@@ -312,7 +311,7 @@ class HypervisorVirtDriver(
availableMemory += server.flavor.memorySize
vms.remove(this)
events.close()
- eventFlow.emit(VirtDriverEvent.VmsUpdated(this@HypervisorVirtDriver, vms.size, availableMemory))
+ eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory))
}
override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
@@ -322,7 +321,14 @@ class HypervisorVirtDriver(
this.burst = burst
requests = cpus.asSequence()
.take(burst.size)
- .mapIndexed { i, cpu -> CpuRequest(this, cpu, burst[i], limit[i]) }
+ .mapIndexed { i, cpu ->
+ CpuRequest(
+ this,
+ cpu,
+ burst[i],
+ limit[i]
+ )
+ }
.toList()
// Wait until the burst has been run or the coroutine is cancelled
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
index 296f170e..d7ae0c12 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
@@ -27,6 +27,7 @@ package com.atlarge.opendc.compute.virt.driver
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
+import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.AbstractServiceKey
import kotlinx.coroutines.flow.Flow
import java.util.UUID
@@ -39,7 +40,7 @@ public interface VirtDriver {
/**
* The events emitted by the driver.
*/
- public val events: Flow<VirtDriverEvent>
+ public val events: Flow<HypervisorEvent>
/**
* Spawn the given [Image] on the compute resource of this driver.
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 8365f8c9..8393dfa9 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -8,22 +8,26 @@ import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage
-import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException
+import com.atlarge.opendc.compute.virt.HypervisorImage
+import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.core.services.ServiceKey
+import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
-import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
+import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
@OptIn(ExperimentalCoroutinesApi::class)
class SimpleVirtProvisioningService(
public override val allocationPolicy: AllocationPolicy,
private val ctx: SimulationContext,
private val provisioningService: ProvisioningService
-) : VirtProvisioningService {
+) : VirtProvisioningService, CoroutineScope by ctx.domain {
/**
* The hypervisors that have been launched by the service.
*/
@@ -45,18 +49,17 @@ class SimpleVirtProvisioningService(
private val activeImages: MutableSet<ImageView> = mutableSetOf()
init {
- ctx.domain.launch {
+ launch {
val provisionedNodes = provisioningService.nodes()
provisionedNodes.forEach { node ->
val hypervisorImage = HypervisorImage
val node = provisioningService.deploy(node, hypervisorImage)
node.server!!.events.onEach { event ->
- when (event) {
- is ServerEvent.StateChanged -> stateChanged(event.server, event.previousState)
- is ServerEvent.ServicePublished -> servicePublished(event.server, event.key)
- }
+ when (event) {
+ is ServerEvent.StateChanged -> stateChanged(event.server)
+ is ServerEvent.ServicePublished -> servicePublished(event.server, event.key)
}
- .launchIn(ctx.domain)
+ }.collect()
}
}
}
@@ -64,8 +67,8 @@ class SimpleVirtProvisioningService(
override suspend fun deploy(
image: Image,
flavor: Flavor
- ) {
- val vmInstance = ImageView(image, flavor)
+ ): Server = suspendCancellableCoroutine { cont ->
+ val vmInstance = ImageView(image, flavor, cont)
incomingImages += vmInstance
requestCycle()
}
@@ -77,7 +80,7 @@ class SimpleVirtProvisioningService(
return
}
- val call = ctx.domain.launch {
+ val call = launch {
schedule()
}
call.invokeOnCompletion { this.call = null }
@@ -92,10 +95,12 @@ class SimpleVirtProvisioningService(
try {
println("Spawning ${imageInstance.image}")
incomingImages -= imageInstance
- imageInstance.server = selectedHv.driver.spawn(
+ val server = selectedHv.driver.spawn(
imageInstance.image,
imageInstance.flavor
)
+ imageInstance.server = server
+ imageInstance.continuation.resume(server)
activeImages += imageInstance
} catch (e: InsufficientMemoryOnServerException) {
println("Unable to deploy image due to insufficient memory")
@@ -103,7 +108,7 @@ class SimpleVirtProvisioningService(
}
}
- private fun stateChanged(server: Server, previousState: ServerState) {
+ private fun stateChanged(server: Server) {
when (server.state) {
ServerState.ACTIVE -> {
val hvView = HypervisorView(
@@ -134,6 +139,7 @@ class SimpleVirtProvisioningService(
data class ImageView(
val image: Image,
val flavor: Flavor,
+ val continuation: Continuation<Server>,
var server: Server? = null
)
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
index da72d742..12543ce3 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
@@ -1,6 +1,7 @@
package com.atlarge.opendc.compute.virt.service
import com.atlarge.opendc.compute.core.Flavor
+import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
@@ -16,5 +17,5 @@ interface VirtProvisioningService {
* @param image The image to be deployed.
* @param flavor The flavor of the machine instance to run this [image] on.
*/
- public suspend fun deploy(image: Image, flavor: Flavor)
+ public suspend fun deploy(image: Image, flavor: Flavor): Server
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
index d86045c0..bcaafb59 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
@@ -30,6 +30,7 @@ import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
+import com.atlarge.opendc.compute.virt.HypervisorImage
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
@@ -75,8 +76,10 @@ internal class HypervisorTest {
val flavor = Flavor(1, 0)
val vmDriver = metalDriver.refresh().server!!.services[VirtDriver]
- vmDriver.spawn(workloadA, flavor).events.onEach { println(it) }.launchIn(this)
- vmDriver.spawn(workloadB, flavor)
+ val vmA = vmDriver.spawn(workloadA, flavor)
+ vmA.events.onEach { println(it) }.launchIn(this)
+ val vmB = vmDriver.spawn(workloadB, flavor)
+ vmB.events.onEach { println(it) }.launchIn(this)
}
runBlocking {