diff options
8 files changed, 123 insertions, 55 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 9396699a..37ae9eb5 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -49,7 +49,6 @@ import com.atlarge.opendc.core.services.ServiceRegistry import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.launch @@ -182,6 +181,10 @@ public class SimpleBareMetalDriver( private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext { private var finalized: Boolean = false + // A state in which the machine is still available, but does not run any of the work requested by the + // image + var unavailable = false + override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus override val server: Server @@ -266,7 +269,9 @@ public class SimpleBareMetalDriver( } } - usageState.value = totalUsage / cpus.size + if (!unavailable) { + usageState.value = totalUsage / cpus.size + } try { delay(duration) @@ -276,7 +281,7 @@ public class SimpleBareMetalDriver( } val end = simulationContext.clock.millis() - // Flush the load if the do not receive a new run call for the same timestamp + // Flush the load if they do not receive a new run call for the same timestamp flush = domain.launch(job) { delay(1) usageState.value = 0.0 @@ -285,6 +290,10 @@ public class SimpleBareMetalDriver( flush = null } + if (unavailable) { + return + } + // Write back the remaining burst time for (i in 0 until min(cpus.size, burst.size)) { val usage = min(limit[i], cpus[i].frequency) @@ -298,12 +307,17 @@ public class SimpleBareMetalDriver( get() = domain override suspend fun fail() { - try { - serverContext?.cancel(fail = true) - domain.cancel() - } catch (_: CancellationException) { - // Ignore if the machine has already failed. - } + serverContext?.unavailable = true + + val server = nodeState.value.server?.copy(state = ServerState.ERROR) + setNode(nodeState.value.copy(state = NodeState.ERROR, server = server)) + } + + override suspend fun recover() { + serverContext?.unavailable = false + + val server = nodeState.value.server?.copy(state = ServerState.ACTIVE) + setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server)) } override fun toString(): String = "SimpleBareMetalDriver(node = ${nodeState.value.uid})" diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt index c21b002d..bd395f0d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -50,7 +50,7 @@ object HypervisorImage : Image { try { suspendCancellableCoroutine<Unit> {} } finally { - driver.eventFlow.close() + driver.cancel() } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index b4626def..c21a9fc0 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -65,8 +65,8 @@ import kotlin.math.min @OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) class SimpleVirtDriver( private val hostContext: ServerContext, - private val coroutineScope: CoroutineScope -) : VirtDriver { + scope: CoroutineScope +) : VirtDriver, CoroutineScope by scope { /** * The [Server] on which this hypervisor runs. */ @@ -98,7 +98,7 @@ class SimpleVirtDriver( it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? performanceModel?.computeIntersectingItems(imagesRunning) } - }.launchIn(coroutineScope) + }.launchIn(this) } override suspend fun spawn( @@ -123,6 +123,10 @@ class SimpleVirtDriver( return server } + internal fun cancel() { + eventFlow.close() + } + /** * A flag to indicate the driver is stopped. */ @@ -141,7 +145,7 @@ class SimpleVirtDriver( /** * Schedule the vCPUs on the physical CPUs. */ - private suspend fun reschedule() { + private fun reschedule() { flush() // Do not schedule a call if there is no work to schedule or the driver stopped. @@ -149,7 +153,7 @@ class SimpleVirtDriver( return } - val call = coroutineScope.launch { + val call = launch { val start = simulationContext.clock.millis() val vms = activeVms.toSet() @@ -219,7 +223,7 @@ class SimpleVirtDriver( val fraction = req.allocatedUsage / totalUsage // Derive the burst that was allocated to this vCPU - val allocatedBurst = ceil(duration * req.allocatedUsage).toLong() + val allocatedBurst = ceil(totalBurst * fraction).toLong() // Compute the burst time that the VM was actually granted val grantedBurst = (performanceScore * (allocatedBurst - ceil(totalRemainder * fraction))).toLong() @@ -244,6 +248,9 @@ class SimpleVirtDriver( server ) ) + + // Make sure we reschedule the remaining amount of work (if we did not obtain the entire request) + reschedule() } this.call = call } @@ -286,7 +293,7 @@ class SimpleVirtDriver( var chan = Channel<Unit>(Channel.RENDEZVOUS) private var initialized: Boolean = false - internal val job: Job = coroutineScope.launch { + internal val job: Job = launch { delay(1) // TODO Introduce boot time init() try { @@ -331,8 +338,8 @@ class SimpleVirtDriver( server = server.copy(state = serverState) availableMemory += server.flavor.memorySize vms.remove(this) - events.close() eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory)) + events.close() } override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index c7347783..85bdc438 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -122,7 +122,7 @@ class SimpleVirtProvisioningService( val requiredMemory = (imageInstance.image as VmImage).requiredMemory val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) ?: break try { - log.info("Spawning ${imageInstance.image} on ${selectedHv.server}") + log.info("Spawning ${imageInstance.image} on ${selectedHv.server} ${availableHypervisors.size}") incomingImages -= imageInstance // Speculatively update the hypervisor view information to prevent other images in the queue from @@ -139,6 +139,19 @@ class SimpleVirtProvisioningService( imageInstance.server = server imageInstance.continuation.resume(server) activeImages += imageInstance + + server.events + .onEach { event -> + when (event) { + is ServerEvent.StateChanged -> { + if (event.server.state == ServerState.SHUTOFF) { + activeImages -= imageInstance + selectedHv.provisionedCores -= server.flavor.cpuCount + } + } + } + } + .launchIn(this) } catch (e: InsufficientMemoryOnServerException) { println("Unable to deploy image due to insufficient memory") @@ -152,18 +165,22 @@ class SimpleVirtProvisioningService( private fun stateChanged(server: Server) { when (server.state) { ServerState.ACTIVE -> { - val hv = HypervisorView( - server.uid, - server, - 0, - server.flavor.memorySize, - 0 - ) - hypervisors[server] = hv + if (server in hypervisors) { + // Corner case for when the hypervisor already exists + availableHypervisors += hypervisors.getValue(server) + } else { + val hv = HypervisorView( + server.uid, + server, + 0, + server.flavor.memorySize, + 0 + ) + hypervisors[server] = hv + } } ServerState.SHUTOFF, ServerState.ERROR -> { val hv = hypervisors[server] ?: return - hv.provisionedCores -= server.flavor.cpuCount availableHypervisors -= hv requestCycle() } diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt index c5189764..2904fbec 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt @@ -44,6 +44,8 @@ public class CorrelatedFaultInjector( private val iatShape: Double, private val sizeScale: Double, private val sizeShape: Double, + private val dScale: Double, + private val dShape: Double, random: Random = Random(0) ) : FaultInjector { /** @@ -84,11 +86,11 @@ public class CorrelatedFaultInjector( } job = this.domain.launch { - while (true) { + while (active.isNotEmpty()) { ensureActive() // Make sure to convert delay from hours to milliseconds - val d = lognvariate(iatScale, iatShape) * 3600 * 1e6 + val d = lognvariate(iatScale, iatShape) * 3.6e6 // Handle long overflow if (simulationContext.clock.millis() + d <= 0) { @@ -98,10 +100,31 @@ public class CorrelatedFaultInjector( delay(d.toLong()) val n = lognvariate(sizeScale, sizeShape).toInt() - for (failureDomain in active.shuffled(random).take(n)) { + val targets = active.shuffled(random).take(n) + + for (failureDomain in targets) { + active -= failureDomain failureDomain.fail() } + + val df = lognvariate(dScale, dShape) * 6e4 + + // Handle long overflow + if (simulationContext.clock.millis() + df <= 0) { + return@launch + } + + delay(df.toLong()) + + for (failureDomain in targets) { + failureDomain.recover() + + // Re-enqueue machine to be failed + enqueue(failureDomain) + } } + + job = null } } diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt index 91ca9b83..d56df3c9 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt @@ -39,4 +39,9 @@ public interface FailureDomain { * Fail the domain externally. */ public suspend fun fail() + + /** + * Resume the failure domain. + */ + public suspend fun recover() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 120c4f81..212b1bfb 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -14,18 +14,13 @@ class Sc20Monitor( destination: String ) : Closeable { private val outputFile = BufferedWriter(FileWriter(destination)) - private var failedInSlice: Int = 0 private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() init { - outputFile.write("time,duration,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n") + outputFile.write("time,duration,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostState,hostUsage,powerDraw,failedVms\n") } - suspend fun onVmStateChanged(server: Server) { - if (server.state == ServerState.ERROR) { - failedInSlice++ - } - } + suspend fun onVmStateChanged(server: Server) {} suspend fun serverStateChanged(driver: VirtDriver, server: Server) { if ((server.state == ServerState.SHUTOFF || server.state == ServerState.ERROR) && @@ -61,12 +56,8 @@ class Sc20Monitor( val driver = hostServer.services[BareMetalDriver.Key] val usage = driver.usage.first() val powerDraw = driver.powerDraw.first() - val failed = if (failedInSlice > 0) { - failedInSlice.also { failedInSlice = 0 } - } else { - 0 - } - outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw,$failed") + + outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},${hostServer.state},$usage,$powerDraw") outputFile.newLine() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index cc403e6e..ca7e31ea 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -52,6 +52,7 @@ import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect @@ -63,6 +64,7 @@ import java.io.File import java.io.FileReader import java.util.ServiceLoader import kotlin.math.max +import kotlin.random.Random class ExperimentParameters(parser: ArgParser) { val traceDirectory by parser.storing("path to the trace directory") @@ -100,11 +102,14 @@ class ExperimentParameters(parser: ArgParser) { /** * Obtain the [FaultInjector] to use for the experiments. */ -fun createFaultInjector(domain: Domain): FaultInjector { +fun createFaultInjector(domain: Domain, random: Random): FaultInjector { // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 return CorrelatedFaultInjector(domain, - iatScale = -1.39, iatShape = 1.03, - sizeScale = 1.88, sizeShape = 1.25 + iatScale = -1.39, iatShape = 1.03, // Hours + sizeScale = 1.88, sizeShape = 1.25, + dScale = 9.51, dShape = 3.21, // Minutes + random = random ) } @@ -202,18 +207,22 @@ fun main(args: Array<String>) { .launchIn(this) } - if (failures) { - println("ENABLE Failures") - root.newDomain(name = "failures").launch { + val failureDomain = if (failures) { + println("ENABLING failures") + val domain = root.newDomain(name = "failures") + domain.launch { chan.receive() + val random = Random(0) val injectors = mutableMapOf<String, FaultInjector>() - for (node in bareMetalProvisioner.nodes()) { val cluster = node.metadata[NODE_CLUSTER] as String - val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain) } + val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) } injector.enqueue(node.metadata["driver"] as FailureDomain) } } + domain + } else { + null } val running = mutableSetOf<Server>() @@ -237,11 +246,12 @@ fun main(args: Array<String>) { monitor.onVmStateChanged(it.server) // Detect whether the VM has finished running - if (it.server.state == ServerState.ERROR || it.server.state == ServerState.SHUTOFF) { + if (it.server.state == ServerState.SHUTOFF) { running -= server + } - if (running.isEmpty() && (!reader.hasNext() || availableHypervisors == 0)) - finish.send(Unit) + if (running.isEmpty() && !reader.hasNext()) { + finish.send(Unit) } } .collect() @@ -250,6 +260,7 @@ fun main(args: Array<String>) { finish.receive() scheduler.terminate() + failureDomain?.cancel() println(simulationContext.clock.instant()) } |
