summaryrefslogtreecommitdiff
path: root/opendc/opendc-compute/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-24 22:04:06 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-25 10:51:28 +0100
commit225a9dd042870b1320681104aa022120611cc92b (patch)
tree08a18803589bd034ff18e0a02588af9febe24005 /opendc/opendc-compute/src
parentedce7993772182bac0d0c74d22189137b35872aa (diff)
feat: Record hypervisor events during experiment
Diffstat (limited to 'opendc/opendc-compute/src')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt43
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt4
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt9
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt17
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt6
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt2
6 files changed, 45 insertions, 36 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 67069c03..9ab6fbc5 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -48,13 +48,10 @@ import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.scanReduce
import kotlinx.coroutines.launch
import java.util.UUID
import kotlin.math.ceil
@@ -112,21 +109,6 @@ public class SimpleBareMetalDriver(
override val powerDraw: Flow<Double> = powerModel(this)
- init {
- @OptIn(ExperimentalCoroutinesApi::class)
- nodeState.scanReduce { field, value ->
- if (field.state != value.state) {
- events.emit(NodeEvent.StateChanged(value, field.state))
- }
-
- if (field.server != null && value.server != null && field.server.state != value.server.state) {
- serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state))
- }
-
- value
- }.launchIn(domain)
- }
-
override suspend fun init(): Node = withContext(domain.coroutineContext) {
nodeState.value
}
@@ -149,7 +131,7 @@ public class SimpleBareMetalDriver(
events
)
- nodeState.value = node.copy(state = NodeState.BOOT, server = server)
+ setNode(node.copy(state = NodeState.BOOT, server = server))
serverContext = BareMetalServerContext(events)
return@withContext nodeState.value
}
@@ -164,7 +146,7 @@ public class SimpleBareMetalDriver(
serverContext!!.cancel(fail = false)
serverContext = null
- nodeState.value = node.copy(state = NodeState.SHUTOFF, server = null)
+ setNode(node.copy(state = NodeState.SHUTOFF, server = null))
return@withContext node
}
@@ -174,12 +156,25 @@ public class SimpleBareMetalDriver(
}
override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) {
- nodeState.value = nodeState.value.copy(image = image)
+ setNode(nodeState.value.copy(image = image))
return@withContext nodeState.value
}
override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value }
+ private fun setNode(value: Node) {
+ val field = nodeState.value
+ if (field.state != value.state) {
+ events.emit(NodeEvent.StateChanged(value, field.state))
+ }
+
+ if (field.server != null && value.server != null && field.server.state != value.server.state) {
+ serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state))
+ }
+
+ nodeState.value = value
+ }
+
private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext {
private var finalized: Boolean = false
@@ -212,7 +207,7 @@ public class SimpleBareMetalDriver(
override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) {
val server = server.copy(services = server.services.put(key, service))
- nodeState.value = nodeState.value.copy(server = server)
+ setNode(nodeState.value.copy(server = server))
events.emit(ServerEvent.ServicePublished(server, key))
}
@@ -220,7 +215,7 @@ public class SimpleBareMetalDriver(
assert(!finalized) { "Machine is already finalized" }
val server = server.copy(state = ServerState.ACTIVE)
- nodeState.value = nodeState.value.copy(state = NodeState.ACTIVE, server = server)
+ setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server))
}
override suspend fun exit(cause: Throwable?) {
@@ -237,7 +232,7 @@ public class SimpleBareMetalDriver(
else
NodeState.ERROR
val server = server.copy(state = newServerState)
- nodeState.value = nodeState.value.copy(state = newNodeState, server = server)
+ setNode(nodeState.value.copy(state = newNodeState, server = server))
}
private var flush: Job? = null
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
index 3230c2ba..5c19b00d 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.compute.virt
+import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.virt.driver.VirtDriver
/**
@@ -60,6 +61,7 @@ public sealed class HypervisorEvent {
override val driver: VirtDriver,
public val requestedBurst: Long,
public val grantedBurst: Long,
- public val numberOfDeployedImages: Int
+ public val numberOfDeployedImages: Int,
+ public val hostServer: Server
) : HypervisorEvent()
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index e7b06329..76368080 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -47,7 +47,6 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
@@ -68,7 +67,7 @@ class SimpleVirtDriver(
/**
* The [Server] on which this hypervisor runs.
*/
- public val server: Server
+ private val server: Server
get() = hostContext.server
/**
@@ -223,7 +222,7 @@ class SimpleVirtDriver(
}
}
- eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size))
+ eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size, server))
}
this.call = call
}
@@ -231,11 +230,11 @@ class SimpleVirtDriver(
/**
* Flush the progress of the current active VMs.
*/
- private suspend fun flush() {
+ private fun flush() {
val call = call ?: return // If there is no active call, there is nothing to flush
// The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its
// completion.
- call.cancelAndJoin()
+ call.cancel()
this.call = null
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index a16c0793..fb874e22 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -19,6 +19,7 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlinx.coroutines.withContext
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
@@ -64,14 +65,20 @@ class SimpleVirtProvisioningService(
}
}
+ override suspend fun drivers(): Set<VirtDriver> = withContext(coroutineContext) {
+ availableHypervisors.map { it.driver }.toSet()
+ }
+
override suspend fun deploy(
name: String,
image: Image,
flavor: Flavor
- ): Server = suspendCancellableCoroutine { cont ->
- val vmInstance = ImageView(name, image, flavor, cont)
- incomingImages += vmInstance
- requestCycle()
+ ): Server = withContext(coroutineContext) {
+ suspendCancellableCoroutine<Server> { cont ->
+ val vmInstance = ImageView(name, image, flavor, cont)
+ incomingImages += vmInstance
+ requestCycle()
+ }
}
private var call: Job? = null
@@ -82,9 +89,9 @@ class SimpleVirtProvisioningService(
}
val call = launch {
+ this@SimpleVirtProvisioningService.call = null
schedule()
}
- call.invokeOnCompletion { this.call = null }
this.call = call
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
index 6f0c22f6..a3ade2fb 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
@@ -3,6 +3,7 @@ package com.atlarge.opendc.compute.virt.service
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
/**
@@ -12,6 +13,11 @@ interface VirtProvisioningService {
val allocationPolicy: AllocationPolicy
/**
+ * Obtain the active hypervisors for this provisioner.
+ */
+ public suspend fun drivers(): Set<VirtDriver>
+
+ /**
* Submit the specified [Image] to the provisioning service.
*
* @param name The name of the server to deploy.
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index e0d8799f..3eb6a12c 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -63,7 +63,7 @@ internal class SimpleBareMetalDriverTest {
val server = driver.start().server!!
server.events.collect { event ->
when (event) {
- is ServerEvent.StateChanged -> finalState = event.server.state
+ is ServerEvent.StateChanged -> { println(event); finalState = event.server.state }
}
}
}