From 225a9dd042870b1320681104aa022120611cc92b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 24 Mar 2020 22:04:06 +0100 Subject: feat: Record hypervisor events during experiment --- .../kotlin/com/atlarge/odcsim/flow/StateFlow.kt | 4 +- .../odcsim/engine/omega/OmegaSimulationEngine.kt | 7 +++- .../compute/metal/driver/SimpleBareMetalDriver.kt | 43 ++++++++++------------ .../atlarge/opendc/compute/virt/HypervisorEvent.kt | 4 +- .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 9 ++--- .../virt/service/SimpleVirtProvisioningService.kt | 17 ++++++--- .../virt/service/VirtProvisioningService.kt | 6 +++ .../metal/driver/SimpleBareMetalDriverTest.kt | 2 +- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 5 ++- .../opendc/experiments/sc20/TestExperiment.kt | 21 ++++++++++- .../com/atlarge/opendc/workflows/workload/Job.kt | 2 + 11 files changed, 77 insertions(+), 43 deletions(-) diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/StateFlow.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/StateFlow.kt index 0410bd95..50add0ad 100644 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/StateFlow.kt +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/StateFlow.kt @@ -70,10 +70,10 @@ private class StateFlowImpl(initialValue: T) : StateFlow { */ private val flow = chan.asFlow() - public override var value: T - get() = chan.value + public override var value: T = initialValue set(value) { chan.offer(value) + field = value } @InternalCoroutinesApi diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index 4edf94d2..934af293 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -37,6 +37,7 @@ import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle @@ -174,6 +175,10 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine } } + private val exceptionHandler = CoroutineExceptionHandler { _, exception -> + log.error("Uncaught exception", exception) + } + // SimulationContext override val key: CoroutineContext.Key<*> = SimulationContext.Key @@ -192,7 +197,7 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine override val parent: Domain = parent ?: this @InternalCoroutinesApi - override val coroutineContext: CoroutineContext = this + CoroutineName(name) + dispatcher + job + override val coroutineContext: CoroutineContext = this + CoroutineName(name) + dispatcher + job + exceptionHandler override fun toString(): String = path } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 67069c03..9ab6fbc5 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -48,13 +48,10 @@ import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.scanReduce import kotlinx.coroutines.launch import java.util.UUID import kotlin.math.ceil @@ -112,21 +109,6 @@ public class SimpleBareMetalDriver( override val powerDraw: Flow = powerModel(this) - init { - @OptIn(ExperimentalCoroutinesApi::class) - nodeState.scanReduce { field, value -> - if (field.state != value.state) { - events.emit(NodeEvent.StateChanged(value, field.state)) - } - - if (field.server != null && value.server != null && field.server.state != value.server.state) { - serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state)) - } - - value - }.launchIn(domain) - } - override suspend fun init(): Node = withContext(domain.coroutineContext) { nodeState.value } @@ -149,7 +131,7 @@ public class SimpleBareMetalDriver( events ) - nodeState.value = node.copy(state = NodeState.BOOT, server = server) + setNode(node.copy(state = NodeState.BOOT, server = server)) serverContext = BareMetalServerContext(events) return@withContext nodeState.value } @@ -164,7 +146,7 @@ public class SimpleBareMetalDriver( serverContext!!.cancel(fail = false) serverContext = null - nodeState.value = node.copy(state = NodeState.SHUTOFF, server = null) + setNode(node.copy(state = NodeState.SHUTOFF, server = null)) return@withContext node } @@ -174,12 +156,25 @@ public class SimpleBareMetalDriver( } override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { - nodeState.value = nodeState.value.copy(image = image) + setNode(nodeState.value.copy(image = image)) return@withContext nodeState.value } override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value } + private fun setNode(value: Node) { + val field = nodeState.value + if (field.state != value.state) { + events.emit(NodeEvent.StateChanged(value, field.state)) + } + + if (field.server != null && value.server != null && field.server.state != value.server.state) { + serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state)) + } + + nodeState.value = value + } + private inner class BareMetalServerContext(val events: EventFlow) : ServerManagementContext { private var finalized: Boolean = false @@ -212,7 +207,7 @@ public class SimpleBareMetalDriver( override suspend fun publishService(key: ServiceKey, service: T) { val server = server.copy(services = server.services.put(key, service)) - nodeState.value = nodeState.value.copy(server = server) + setNode(nodeState.value.copy(server = server)) events.emit(ServerEvent.ServicePublished(server, key)) } @@ -220,7 +215,7 @@ public class SimpleBareMetalDriver( assert(!finalized) { "Machine is already finalized" } val server = server.copy(state = ServerState.ACTIVE) - nodeState.value = nodeState.value.copy(state = NodeState.ACTIVE, server = server) + setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server)) } override suspend fun exit(cause: Throwable?) { @@ -237,7 +232,7 @@ public class SimpleBareMetalDriver( else NodeState.ERROR val server = server.copy(state = newServerState) - nodeState.value = nodeState.value.copy(state = newNodeState, server = server) + setNode(nodeState.value.copy(state = newNodeState, server = server)) } private var flush: Job? = null diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt index 3230c2ba..5c19b00d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.compute.virt +import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.virt.driver.VirtDriver /** @@ -60,6 +61,7 @@ public sealed class HypervisorEvent { override val driver: VirtDriver, public val requestedBurst: Long, public val grantedBurst: Long, - public val numberOfDeployedImages: Int + public val numberOfDeployedImages: Int, + public val hostServer: Server ) : HypervisorEvent() } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index e7b06329..76368080 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -47,7 +47,6 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job -import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow @@ -68,7 +67,7 @@ class SimpleVirtDriver( /** * The [Server] on which this hypervisor runs. */ - public val server: Server + private val server: Server get() = hostContext.server /** @@ -223,7 +222,7 @@ class SimpleVirtDriver( } } - eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size)) + eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size, server)) } this.call = call } @@ -231,11 +230,11 @@ class SimpleVirtDriver( /** * Flush the progress of the current active VMs. */ - private suspend fun flush() { + private fun flush() { val call = call ?: return // If there is no active call, there is nothing to flush // The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its // completion. - call.cancelAndJoin() + call.cancel() this.call = null } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index a16c0793..fb874e22 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -19,6 +19,7 @@ import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.withContext import kotlin.coroutines.Continuation import kotlin.coroutines.resume @@ -64,14 +65,20 @@ class SimpleVirtProvisioningService( } } + override suspend fun drivers(): Set = withContext(coroutineContext) { + availableHypervisors.map { it.driver }.toSet() + } + override suspend fun deploy( name: String, image: Image, flavor: Flavor - ): Server = suspendCancellableCoroutine { cont -> - val vmInstance = ImageView(name, image, flavor, cont) - incomingImages += vmInstance - requestCycle() + ): Server = withContext(coroutineContext) { + suspendCancellableCoroutine { cont -> + val vmInstance = ImageView(name, image, flavor, cont) + incomingImages += vmInstance + requestCycle() + } } private var call: Job? = null @@ -82,9 +89,9 @@ class SimpleVirtProvisioningService( } val call = launch { + this@SimpleVirtProvisioningService.call = null schedule() } - call.invokeOnCompletion { this.call = null } this.call = call } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index 6f0c22f6..a3ade2fb 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -3,6 +3,7 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy /** @@ -11,6 +12,11 @@ import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy interface VirtProvisioningService { val allocationPolicy: AllocationPolicy + /** + * Obtain the active hypervisors for this provisioner. + */ + public suspend fun drivers(): Set + /** * Submit the specified [Image] to the provisioning service. * diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index e0d8799f..3eb6a12c 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -63,7 +63,7 @@ internal class SimpleBareMetalDriverTest { val server = driver.start().server!! server.events.collect { event -> when (event) { - is ServerEvent.StateChanged -> finalState = event.server.state + is ServerEvent.StateChanged -> { println(event); finalState = event.server.state } } } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index e18bbe30..36da7703 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -1,5 +1,6 @@ package com.atlarge.opendc.experiments.sc20 +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver @@ -18,8 +19,8 @@ class Sc20Monitor( outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n") } - fun stateChanged(server: Server) { - println("${server.uid} ${server.state}") + suspend fun stateChanged(server: Server) { + println("[${simulationContext.clock.millis()}] ${server.uid} ${server.state}") if (server.state == ServerState.ERROR) { failed++ } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 6d832ee4..0fafc118 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -29,10 +29,10 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.core.failure.CorrelatedFaultInjector -import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader @@ -43,6 +43,7 @@ import com.xenomachina.argparser.default import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -116,6 +117,21 @@ fun main(args: Array) { bareMetalProvisioner ) + // Wait for the hypervisors to be spawned + delay(10) + + // Monitor hypervisor events + for (hypervisor in scheduler.drivers()) { + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, event.numberOfDeployedImages, event.hostServer) + else -> println(event) + } + } + .launchIn(this) + } + val faultInjectorDomain = root.newDomain(name = "failures") faultInjectorDomain.launch { chan.receive() @@ -125,7 +141,7 @@ fun main(args: Array) { sizeScale = 1.88, sizeShape = 1.25 ) for (node in bareMetalProvisioner.nodes()) { - faultInjector.enqueue(node.metadata["driver"] as FailureDomain) + // faultInjector.enqueue(node.metadata["driver"] as FailureDomain) } } @@ -139,6 +155,7 @@ fun main(args: Array) { workload.image.name, workload.image, Flavor(workload.image.cores, workload.image.requiredMemory) ) + // Monitor server events server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect() } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt index 40389ce2..02969d8a 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt @@ -47,4 +47,6 @@ data class Job( override fun equals(other: Any?): Boolean = other is Job && uid == other.uid override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Job(uid=$uid, name=$name, tasks=${tasks.size}, metadata=$metadata)" } -- cgit v1.2.3