diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-17 17:37:41 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-25 10:48:58 +0100 |
| commit | 6b10881f123f5e6a8e7bce1045d02eba5e48c3a2 (patch) | |
| tree | d9f34ec80e249fff595304442abf4d2c6b36d7a9 | |
| parent | b3e8e3d196de8b8c1bb904bfb3c6641415cf72bb (diff) | |
[ci skip] feat: Add support for correlated failures
8 files changed, 151 insertions, 33 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 4b9a03a6..a8f3d781 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -45,6 +45,7 @@ import com.atlarge.opendc.core.power.PowerModel import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.launch @@ -82,22 +83,20 @@ public class SimpleBareMetalDriver( * The machine state. */ private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null) - set(value) { - if (field.state != value.state) { - domain.launch { - monitor.onUpdate(value, field.state) - } - } - if (field.server != null && value.server != null && field.server!!.state != value.server.state) { - domain.launch { - monitor.onUpdate(value.server, field.server!!.state) - } - } + private suspend fun setNode(value: Node) { + val field = node + if (field.state != value.state) { + monitor.onUpdate(value, field.state) + } - field = value + if (field.server != null && value.server != null && field.server.state != value.server.state) { + monitor.onUpdate(value.server, field.server.state) } + node = value + } + /** * The flavor that corresponds to this machine. */ @@ -137,7 +136,7 @@ public class SimpleBareMetalDriver( ) server.serviceRegistry[BareMetalDriver.Key] = this@SimpleBareMetalDriver - node = node.copy(state = NodeState.BOOT, server = server) + setNode(node.copy(state = NodeState.BOOT, server = server)) serverContext = BareMetalServerContext() return@withContext node } @@ -151,7 +150,7 @@ public class SimpleBareMetalDriver( serverContext!!.cancel(fail = false) serverContext = null - node = node.copy(state = NodeState.SHUTOFF, server = null) + setNode(node.copy(state = NodeState.SHUTOFF, server = null)) return@withContext node } @@ -161,7 +160,7 @@ public class SimpleBareMetalDriver( } override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { - node = node.copy(image = image) + setNode(node.copy(image = image)) return@withContext node } @@ -190,9 +189,9 @@ public class SimpleBareMetalDriver( */ suspend fun cancel(fail: Boolean) { if (fail) - job.cancel(ShutdownException(cause = Exception("Random failure"))) + domain.cancel(ShutdownException(cause = Exception("Random failure"))) else - job.cancel(ShutdownException()) + domain.cancel(ShutdownException()) job.join() } @@ -200,7 +199,7 @@ public class SimpleBareMetalDriver( assert(!finalized) { "Machine is already finalized" } val server = server.copy(state = ServerState.ACTIVE) - node = node.copy(state = NodeState.ACTIVE, server = server) + setNode(node.copy(state = NodeState.ACTIVE, server = server)) } override suspend fun exit(cause: Throwable?) { @@ -217,7 +216,7 @@ public class SimpleBareMetalDriver( else NodeState.ERROR val server = server.copy(state = serverState) - node = node.copy(state = nodeState, server = server) + setNode(node.copy(state = nodeState, server = server)) } private var flush: Job? = null diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 0f38eca1..a50292a7 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -14,7 +14,6 @@ import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnSer import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import kotlinx.coroutines.launch -import kotlinx.coroutines.yield class SimpleVirtProvisioningService( public override val allocationPolicy: AllocationPolicy, @@ -56,13 +55,6 @@ class SimpleVirtProvisioningService( server.flavor.memorySize ) hypervisors[server] = hvView - yield() - server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor { - override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) { - hvView.numberOfActiveServers = numberOfActiveServers - hvView.availableMemory = availableMemory - } - }) } } } @@ -100,10 +92,19 @@ class SimpleVirtProvisioningService( } override suspend fun onUpdate(server: Server, previousState: ServerState) { + println("${server.uid} ${server.state} ${hypervisors[server]}") when (server.state) { ServerState.ACTIVE -> { val hv = hypervisors[server] ?: return availableHypervisors += hv + + server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor { + override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) { + hv.numberOfActiveServers = numberOfActiveServers + hv.availableMemory = availableMemory + } + }) + requestCycle() } ServerState.SHUTOFF, ServerState.ERROR -> { diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index a837130d..ef19427e 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -63,7 +63,7 @@ internal class SimpleProvisioningServiceTest { val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", cpus, emptyList()) + val driver = SimpleBareMetalDriver(dom.newDomain(), UUID.randomUUID(), "test", cpus, emptyList()) val provisioner = SimpleProvisioningService(dom) provisioner.create(driver) diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt new file mode 100644 index 00000000..41412195 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt @@ -0,0 +1,111 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.failure + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.launch +import kotlin.math.exp +import kotlin.random.Random +import kotlin.random.asJavaRandom + +/** + * A [FaultInjector] that injects fault in the system which are correlated to each other. Failures do not occur in + * isolation, but will trigger other faults. + */ +public class CorrelatedFaultInjector( + private val domain: Domain, + private val iatScale: Double, + private val iatShape: Double, + private val sizeScale: Double, + private val sizeShape: Double, + random: Random = Random +) : FaultInjector { + /** + * The active failure domains that have been registered. + */ + private val active = mutableSetOf<FailureDomain>() + + /** + * The [Job] that awaits the nearest fault in the system. + */ + private var job: Job? = null + + /** + * The [Random] instance to use. + */ + private val random: java.util.Random = random.asJavaRandom() + + /** + * Enqueue the specified [FailureDomain] to fail some time in the future. + */ + override fun enqueue(domain: FailureDomain) { + active += domain + + // Clean up the domain if it finishes + domain.scope.coroutineContext[Job]!!.invokeOnCompletion { + this@CorrelatedFaultInjector.domain.launch { + println("CANCELLED") + active -= domain + + if (active.isEmpty()) { + job?.cancel() + job = null + } + } + } + + if (job != null) { + return + } + + job = this.domain.launch { + while (true) { + ensureActive() + + val d = lognvariate(iatScale, iatShape) * 1e3 // Make sure to convert delay to milliseconds + + // Handle long overflow + if (simulationContext.clock.millis() + d <= 0) { + return@launch + } + + delay(d.toLong()) + + val n = lognvariate(sizeScale, sizeShape).toInt() + + for (failureDomain in active.shuffled(random).take(n)) { + failureDomain.fail() + } + } + } + } + + // XXX We should extract this in some common package later on. + private fun lognvariate(scale: Double, shape: Double) = exp(scale + shape * random.nextGaussian()) +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt index 56706824..3883eb11 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt @@ -42,7 +42,7 @@ public class UncorrelatedFaultInjector(private val alpha: Double, private val be override fun enqueue(domain: FailureDomain) { domain.scope.launch { val d = random.weibull(alpha, beta) * 1e3 // Make sure to convert delay to milliseconds - + // Handle long overflow if (simulationContext.clock.millis() + d <= 0) { return@launch diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt index d9a85231..a036a705 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt @@ -48,5 +48,5 @@ public interface ServiceRegistry { /** * Register the specified [ServiceKey] in this registry. */ - public operator fun <T : Any> set(key: ServiceKey<T>, service: T) + public operator fun <T : Any> set(key: ServiceKey<T>, service: T): ServiceRegistry } diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt index 91147839..e3fa171d 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt @@ -43,4 +43,6 @@ public class ServiceRegistryImpl : ServiceRegistry { @Suppress("UNCHECKED_CAST") return services[key] as T } + + override fun toString(): String = services.toString() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 69174f0f..09b6592e 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -30,8 +30,8 @@ import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy +import com.atlarge.opendc.core.failure.CorrelatedFaultInjector import com.atlarge.opendc.core.failure.FailureDomain -import com.atlarge.opendc.core.failure.UncorrelatedFaultInjector import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader @@ -39,6 +39,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default +import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -115,10 +116,14 @@ fun main(args: Array<String>) { monitor ) - root.launch { + val faultInjectorDomain = root.newDomain(name = "failures") + faultInjectorDomain.launch { chan.receive() // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - val faultInjector = UncorrelatedFaultInjector(alpha = 9.66772, beta = 12.23796) + val faultInjector = CorrelatedFaultInjector(faultInjectorDomain, + iatScale = -1.39, iatShape = 1.03, + sizeScale = 1.88, sizeShape = 1.25 + ) for (node in bareMetalProvisioner.nodes()) { faultInjector.enqueue(node.metadata["driver"] as FailureDomain) } |
