diff options
8 files changed, 128 insertions, 15 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 4a40dc9f..9396699a 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -142,7 +142,7 @@ public class SimpleBareMetalDriver( override suspend fun stop(): Node = withContext(domain.coroutineContext) { val node = nodeState.value - if (node.state == NodeState.SHUTOFF) { + if (node.state == NodeState.SHUTOFF || node.state == NodeState.ERROR) { return@withContext node } @@ -298,8 +298,12 @@ public class SimpleBareMetalDriver( get() = domain override suspend fun fail() { - serverContext?.cancel(fail = true) - domain.cancel() + try { + serverContext?.cancel(fail = true) + domain.cancel() + } catch (_: CancellationException) { + // Ignore if the machine has already failed. + } } override fun toString(): String = "SimpleBareMetalDriver(node = ${nodeState.value.uid})" diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt index 105505f2..a54d8df4 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt @@ -55,6 +55,11 @@ public interface ProvisioningService { public suspend fun deploy(node: Node, image: Image): Node /** + * Stop the specified [Node] . + */ + public suspend fun stop(node: Node): Node + + /** * The service key of this service. */ companion object Key : AbstractServiceKey<ProvisioningService>(UUID.randomUUID(), "provisioner") diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt index a7e143aa..f6b236ae 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt @@ -28,6 +28,7 @@ import com.atlarge.odcsim.Domain import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.withContext /** @@ -57,4 +58,13 @@ public class SimpleProvisioningService(val domain: Domain) : ProvisioningService val newNode = driver.reboot() return@withContext newNode } + + override suspend fun stop(node: Node): Node = withContext(domain.coroutineContext) { + val driver = nodes[node]!! + try { + driver.stop() + } catch (e: CancellationException) { + node + } + } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 76368080..3086f4e6 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -67,7 +67,7 @@ class SimpleVirtDriver( /** * The [Server] on which this hypervisor runs. */ - private val server: Server + val server: Server get() = hostContext.server /** diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 156521db..b8966275 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,6 +1,7 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.odcsim.SimulationContext +import com.atlarge.odcsim.flow.EventFlow import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerEvent @@ -18,6 +19,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -52,6 +54,8 @@ class SimpleVirtProvisioningService( */ private val activeImages: MutableSet<ImageView> = mutableSetOf() + override val hypervisorEvents: Flow<HypervisorEvent> = EventFlow() + init { launch { val provisionedNodes = provisioningService.nodes() @@ -84,6 +88,11 @@ class SimpleVirtProvisioningService( } } + override suspend fun terminate() { + val provisionedNodes = provisioningService.nodes() + provisionedNodes.forEach { node -> provisioningService.stop(node) } + } + private var call: Job? = null private fun requestCycle() { diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index a3ade2fb..fb200f88 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -3,8 +3,10 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import kotlinx.coroutines.flow.Flow /** * A service for VM provisioning on a cloud. @@ -13,6 +15,11 @@ interface VirtProvisioningService { val allocationPolicy: AllocationPolicy /** + * The events emitted by the hypervisors. + */ + public val hypervisorEvents: Flow<HypervisorEvent> + + /** * Obtain the active hypervisors for this provisioner. */ public suspend fun drivers(): Set<VirtDriver> @@ -25,4 +32,9 @@ interface VirtProvisioningService { * @param flavor The flavor of the machine instance to run this [image] on. */ public suspend fun deploy(name: String, image: Image, flavor: Flavor): Server + + /** + * Terminate the provisioning service releasing all the leased bare-metal machines. + */ + public suspend fun terminate() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 36da7703..120c4f81 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -4,6 +4,7 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriver import kotlinx.coroutines.flow.first import java.io.BufferedWriter import java.io.Closeable @@ -13,32 +14,59 @@ class Sc20Monitor( destination: String ) : Closeable { private val outputFile = BufferedWriter(FileWriter(destination)) - private var failed: Int = 0 + private var failedInSlice: Int = 0 + private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() init { - outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n") + outputFile.write("time,duration,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n") } - suspend fun stateChanged(server: Server) { - println("[${simulationContext.clock.millis()}] ${server.uid} ${server.state}") + suspend fun onVmStateChanged(server: Server) { if (server.state == ServerState.ERROR) { - failed++ + failedInSlice++ } } + suspend fun serverStateChanged(driver: VirtDriver, server: Server) { + if ((server.state == ServerState.SHUTOFF || server.state == ServerState.ERROR) && + lastServerStates.containsKey(server) + ) { + val duration = simulationContext.clock.millis() - lastServerStates[server]!!.second + onSliceFinish( + simulationContext.clock.millis(), + 0, + 0, + 0, + server, + duration + ) + } + + println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}") + + lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) + } + suspend fun onSliceFinish( time: Long, requestedBurst: Long, grantedBurst: Long, numberOfDeployedImages: Int, - hostServer: Server + hostServer: Server, + duration: Long = 5 * 60 * 1000L ) { + lastServerStates.remove(hostServer) + // Assume for now that the host is not virtualized and measure the current power draw val driver = hostServer.services[BareMetalDriver.Key] val usage = driver.usage.first() val powerDraw = driver.powerDraw.first() - - outputFile.write("$time,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw,$failed") + val failed = if (failedInSlice > 0) { + failedInSlice.also { failedInSlice = 0 } + } else { + 0 + } + outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw,$failed") outputFile.newLine() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 639c3aef..ed971df5 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -28,10 +28,13 @@ import com.atlarge.odcsim.Domain import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.NODE_CLUSTER import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.HypervisorEvent +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.core.failure.CorrelatedFaultInjector @@ -141,12 +144,35 @@ fun main(args: Array<String>) { // Wait for the hypervisors to be spawned delay(10) + val hypervisors = scheduler.drivers() + var availableHypervisors = hypervisors.size + // Monitor hypervisor events - for (hypervisor in scheduler.drivers()) { + for (hypervisor in hypervisors) { + // TODO Do not expose VirtDriver directly but use Hypervisor class. + monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server) + hypervisor.server.events + .onEach { event -> + when (event) { + is ServerEvent.StateChanged -> { + monitor.serverStateChanged(hypervisor, event.server) + + if (event.server.state == ServerState.ERROR) + availableHypervisors -= 1 + } + } + } + .launchIn(this) hypervisor.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, event.numberOfDeployedImages, event.hostServer) + is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( + simulationContext.clock.millis(), + event.requestedBurst, + event.grantedBurst, + event.numberOfDeployedImages, + event.hostServer + ) else -> println(event) } } @@ -164,6 +190,9 @@ fun main(args: Array<String>) { } } + val running = mutableSetOf<Server>() + val finish = Channel<Unit>(Channel.RENDEZVOUS) + val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) while (reader.hasNext()) { val (time, workload) = reader.next() @@ -174,11 +203,27 @@ fun main(args: Array<String>) { workload.image.name, workload.image, Flavor(workload.image.cores, workload.image.requiredMemory) ) + running += server // Monitor server events - server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect() + server.events + .onEach { + if (it is ServerEvent.StateChanged) + monitor.onVmStateChanged(it.server) + + // Detect whether the VM has finished running + if (it.server.state == ServerState.ERROR || it.server.state == ServerState.SHUTOFF) { + running -= server + + if (running.isEmpty() && (!reader.hasNext() || availableHypervisors == 0)) + finish.send(Unit) + } + } + .collect() } } + finish.receive() + scheduler.terminate() println(simulationContext.clock.instant()) } |
