summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-24 22:04:06 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-25 10:51:28 +0100
commit225a9dd042870b1320681104aa022120611cc92b (patch)
tree08a18803589bd034ff18e0a02588af9febe24005 /opendc
parentedce7993772182bac0d0c74d22189137b35872aa (diff)
feat: Record hypervisor events during experiment
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt43
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt4
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt9
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt17
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt6
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt2
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt5
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt21
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt2
9 files changed, 69 insertions, 40 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 67069c03..9ab6fbc5 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -48,13 +48,10 @@ import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.scanReduce
import kotlinx.coroutines.launch
import java.util.UUID
import kotlin.math.ceil
@@ -112,21 +109,6 @@ public class SimpleBareMetalDriver(
override val powerDraw: Flow<Double> = powerModel(this)
- init {
- @OptIn(ExperimentalCoroutinesApi::class)
- nodeState.scanReduce { field, value ->
- if (field.state != value.state) {
- events.emit(NodeEvent.StateChanged(value, field.state))
- }
-
- if (field.server != null && value.server != null && field.server.state != value.server.state) {
- serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state))
- }
-
- value
- }.launchIn(domain)
- }
-
override suspend fun init(): Node = withContext(domain.coroutineContext) {
nodeState.value
}
@@ -149,7 +131,7 @@ public class SimpleBareMetalDriver(
events
)
- nodeState.value = node.copy(state = NodeState.BOOT, server = server)
+ setNode(node.copy(state = NodeState.BOOT, server = server))
serverContext = BareMetalServerContext(events)
return@withContext nodeState.value
}
@@ -164,7 +146,7 @@ public class SimpleBareMetalDriver(
serverContext!!.cancel(fail = false)
serverContext = null
- nodeState.value = node.copy(state = NodeState.SHUTOFF, server = null)
+ setNode(node.copy(state = NodeState.SHUTOFF, server = null))
return@withContext node
}
@@ -174,12 +156,25 @@ public class SimpleBareMetalDriver(
}
override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) {
- nodeState.value = nodeState.value.copy(image = image)
+ setNode(nodeState.value.copy(image = image))
return@withContext nodeState.value
}
override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value }
+ private fun setNode(value: Node) {
+ val field = nodeState.value
+ if (field.state != value.state) {
+ events.emit(NodeEvent.StateChanged(value, field.state))
+ }
+
+ if (field.server != null && value.server != null && field.server.state != value.server.state) {
+ serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state))
+ }
+
+ nodeState.value = value
+ }
+
private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext {
private var finalized: Boolean = false
@@ -212,7 +207,7 @@ public class SimpleBareMetalDriver(
override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) {
val server = server.copy(services = server.services.put(key, service))
- nodeState.value = nodeState.value.copy(server = server)
+ setNode(nodeState.value.copy(server = server))
events.emit(ServerEvent.ServicePublished(server, key))
}
@@ -220,7 +215,7 @@ public class SimpleBareMetalDriver(
assert(!finalized) { "Machine is already finalized" }
val server = server.copy(state = ServerState.ACTIVE)
- nodeState.value = nodeState.value.copy(state = NodeState.ACTIVE, server = server)
+ setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server))
}
override suspend fun exit(cause: Throwable?) {
@@ -237,7 +232,7 @@ public class SimpleBareMetalDriver(
else
NodeState.ERROR
val server = server.copy(state = newServerState)
- nodeState.value = nodeState.value.copy(state = newNodeState, server = server)
+ setNode(nodeState.value.copy(state = newNodeState, server = server))
}
private var flush: Job? = null
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
index 3230c2ba..5c19b00d 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.compute.virt
+import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.virt.driver.VirtDriver
/**
@@ -60,6 +61,7 @@ public sealed class HypervisorEvent {
override val driver: VirtDriver,
public val requestedBurst: Long,
public val grantedBurst: Long,
- public val numberOfDeployedImages: Int
+ public val numberOfDeployedImages: Int,
+ public val hostServer: Server
) : HypervisorEvent()
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index e7b06329..76368080 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -47,7 +47,6 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
@@ -68,7 +67,7 @@ class SimpleVirtDriver(
/**
* The [Server] on which this hypervisor runs.
*/
- public val server: Server
+ private val server: Server
get() = hostContext.server
/**
@@ -223,7 +222,7 @@ class SimpleVirtDriver(
}
}
- eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size))
+ eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size, server))
}
this.call = call
}
@@ -231,11 +230,11 @@ class SimpleVirtDriver(
/**
* Flush the progress of the current active VMs.
*/
- private suspend fun flush() {
+ private fun flush() {
val call = call ?: return // If there is no active call, there is nothing to flush
// The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its
// completion.
- call.cancelAndJoin()
+ call.cancel()
this.call = null
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index a16c0793..fb874e22 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -19,6 +19,7 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlinx.coroutines.withContext
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
@@ -64,14 +65,20 @@ class SimpleVirtProvisioningService(
}
}
+ override suspend fun drivers(): Set<VirtDriver> = withContext(coroutineContext) {
+ availableHypervisors.map { it.driver }.toSet()
+ }
+
override suspend fun deploy(
name: String,
image: Image,
flavor: Flavor
- ): Server = suspendCancellableCoroutine { cont ->
- val vmInstance = ImageView(name, image, flavor, cont)
- incomingImages += vmInstance
- requestCycle()
+ ): Server = withContext(coroutineContext) {
+ suspendCancellableCoroutine<Server> { cont ->
+ val vmInstance = ImageView(name, image, flavor, cont)
+ incomingImages += vmInstance
+ requestCycle()
+ }
}
private var call: Job? = null
@@ -82,9 +89,9 @@ class SimpleVirtProvisioningService(
}
val call = launch {
+ this@SimpleVirtProvisioningService.call = null
schedule()
}
- call.invokeOnCompletion { this.call = null }
this.call = call
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
index 6f0c22f6..a3ade2fb 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
@@ -3,6 +3,7 @@ package com.atlarge.opendc.compute.virt.service
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
/**
@@ -12,6 +13,11 @@ interface VirtProvisioningService {
val allocationPolicy: AllocationPolicy
/**
+ * Obtain the active hypervisors for this provisioner.
+ */
+ public suspend fun drivers(): Set<VirtDriver>
+
+ /**
* Submit the specified [Image] to the provisioning service.
*
* @param name The name of the server to deploy.
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index e0d8799f..3eb6a12c 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -63,7 +63,7 @@ internal class SimpleBareMetalDriverTest {
val server = driver.start().server!!
server.events.collect { event ->
when (event) {
- is ServerEvent.StateChanged -> finalState = event.server.state
+ is ServerEvent.StateChanged -> { println(event); finalState = event.server.state }
}
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index e18bbe30..36da7703 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -1,5 +1,6 @@
package com.atlarge.opendc.experiments.sc20
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
@@ -18,8 +19,8 @@ class Sc20Monitor(
outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n")
}
- fun stateChanged(server: Server) {
- println("${server.uid} ${server.state}")
+ suspend fun stateChanged(server: Server) {
+ println("[${simulationContext.clock.millis()}] ${server.uid} ${server.state}")
if (server.state == ServerState.ERROR) {
failed++
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 6d832ee4..0fafc118 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -29,10 +29,10 @@ import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
-import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader
@@ -43,6 +43,7 @@ import com.xenomachina.argparser.default
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
@@ -116,6 +117,21 @@ fun main(args: Array<String>) {
bareMetalProvisioner
)
+ // Wait for the hypervisors to be spawned
+ delay(10)
+
+ // Monitor hypervisor events
+ for (hypervisor in scheduler.drivers()) {
+ hypervisor.events
+ .onEach { event ->
+ when (event) {
+ is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, event.numberOfDeployedImages, event.hostServer)
+ else -> println(event)
+ }
+ }
+ .launchIn(this)
+ }
+
val faultInjectorDomain = root.newDomain(name = "failures")
faultInjectorDomain.launch {
chan.receive()
@@ -125,7 +141,7 @@ fun main(args: Array<String>) {
sizeScale = 1.88, sizeShape = 1.25
)
for (node in bareMetalProvisioner.nodes()) {
- faultInjector.enqueue(node.metadata["driver"] as FailureDomain)
+ // faultInjector.enqueue(node.metadata["driver"] as FailureDomain)
}
}
@@ -139,6 +155,7 @@ fun main(args: Array<String>) {
workload.image.name, workload.image,
Flavor(workload.image.cores, workload.image.requiredMemory)
)
+ // Monitor server events
server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect()
}
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt
index 40389ce2..02969d8a 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt
@@ -47,4 +47,6 @@ data class Job(
override fun equals(other: Any?): Boolean = other is Job && uid == other.uid
override fun hashCode(): Int = uid.hashCode()
+
+ override fun toString(): String = "Job(uid=$uid, name=$name, tasks=${tasks.size}, metadata=$metadata)"
}