summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-17 17:37:41 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-25 10:48:58 +0100
commit6b10881f123f5e6a8e7bce1045d02eba5e48c3a2 (patch)
treed9f34ec80e249fff595304442abf4d2c6b36d7a9
parentb3e8e3d196de8b8c1bb904bfb3c6641415cf72bb (diff)
[ci skip] feat: Add support for correlated failures
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt37
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt17
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt2
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt111
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt2
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt2
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt2
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt11
8 files changed, 151 insertions, 33 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 4b9a03a6..a8f3d781 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -45,6 +45,7 @@ import com.atlarge.opendc.core.power.PowerModel
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
+import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
@@ -82,22 +83,20 @@ public class SimpleBareMetalDriver(
* The machine state.
*/
private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null)
- set(value) {
- if (field.state != value.state) {
- domain.launch {
- monitor.onUpdate(value, field.state)
- }
- }
- if (field.server != null && value.server != null && field.server!!.state != value.server.state) {
- domain.launch {
- monitor.onUpdate(value.server, field.server!!.state)
- }
- }
+ private suspend fun setNode(value: Node) {
+ val field = node
+ if (field.state != value.state) {
+ monitor.onUpdate(value, field.state)
+ }
- field = value
+ if (field.server != null && value.server != null && field.server.state != value.server.state) {
+ monitor.onUpdate(value.server, field.server.state)
}
+ node = value
+ }
+
/**
* The flavor that corresponds to this machine.
*/
@@ -137,7 +136,7 @@ public class SimpleBareMetalDriver(
)
server.serviceRegistry[BareMetalDriver.Key] = this@SimpleBareMetalDriver
- node = node.copy(state = NodeState.BOOT, server = server)
+ setNode(node.copy(state = NodeState.BOOT, server = server))
serverContext = BareMetalServerContext()
return@withContext node
}
@@ -151,7 +150,7 @@ public class SimpleBareMetalDriver(
serverContext!!.cancel(fail = false)
serverContext = null
- node = node.copy(state = NodeState.SHUTOFF, server = null)
+ setNode(node.copy(state = NodeState.SHUTOFF, server = null))
return@withContext node
}
@@ -161,7 +160,7 @@ public class SimpleBareMetalDriver(
}
override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) {
- node = node.copy(image = image)
+ setNode(node.copy(image = image))
return@withContext node
}
@@ -190,9 +189,9 @@ public class SimpleBareMetalDriver(
*/
suspend fun cancel(fail: Boolean) {
if (fail)
- job.cancel(ShutdownException(cause = Exception("Random failure")))
+ domain.cancel(ShutdownException(cause = Exception("Random failure")))
else
- job.cancel(ShutdownException())
+ domain.cancel(ShutdownException())
job.join()
}
@@ -200,7 +199,7 @@ public class SimpleBareMetalDriver(
assert(!finalized) { "Machine is already finalized" }
val server = server.copy(state = ServerState.ACTIVE)
- node = node.copy(state = NodeState.ACTIVE, server = server)
+ setNode(node.copy(state = NodeState.ACTIVE, server = server))
}
override suspend fun exit(cause: Throwable?) {
@@ -217,7 +216,7 @@ public class SimpleBareMetalDriver(
else
NodeState.ERROR
val server = server.copy(state = serverState)
- node = node.copy(state = nodeState, server = server)
+ setNode(node.copy(state = nodeState, server = server))
}
private var flush: Job? = null
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 0f38eca1..a50292a7 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -14,7 +14,6 @@ import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnSer
import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
class SimpleVirtProvisioningService(
public override val allocationPolicy: AllocationPolicy,
@@ -56,13 +55,6 @@ class SimpleVirtProvisioningService(
server.flavor.memorySize
)
hypervisors[server] = hvView
- yield()
- server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor {
- override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) {
- hvView.numberOfActiveServers = numberOfActiveServers
- hvView.availableMemory = availableMemory
- }
- })
}
}
}
@@ -100,10 +92,19 @@ class SimpleVirtProvisioningService(
}
override suspend fun onUpdate(server: Server, previousState: ServerState) {
+ println("${server.uid} ${server.state} ${hypervisors[server]}")
when (server.state) {
ServerState.ACTIVE -> {
val hv = hypervisors[server] ?: return
availableHypervisors += hv
+
+ server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor {
+ override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) {
+ hv.numberOfActiveServers = numberOfActiveServers
+ hv.availableMemory = availableMemory
+ }
+ })
+
requestCycle()
}
ServerState.SHUTOFF, ServerState.ERROR -> {
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index a837130d..ef19427e 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -63,7 +63,7 @@ internal class SimpleProvisioningServiceTest {
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", cpus, emptyList())
+ val driver = SimpleBareMetalDriver(dom.newDomain(), UUID.randomUUID(), "test", cpus, emptyList())
val provisioner = SimpleProvisioningService(dom)
provisioner.create(driver)
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
new file mode 100644
index 00000000..41412195
--- /dev/null
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
@@ -0,0 +1,111 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.core.failure
+
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.ensureActive
+import kotlinx.coroutines.launch
+import kotlin.math.exp
+import kotlin.random.Random
+import kotlin.random.asJavaRandom
+
+/**
+ * A [FaultInjector] that injects fault in the system which are correlated to each other. Failures do not occur in
+ * isolation, but will trigger other faults.
+ */
+public class CorrelatedFaultInjector(
+ private val domain: Domain,
+ private val iatScale: Double,
+ private val iatShape: Double,
+ private val sizeScale: Double,
+ private val sizeShape: Double,
+ random: Random = Random
+) : FaultInjector {
+ /**
+ * The active failure domains that have been registered.
+ */
+ private val active = mutableSetOf<FailureDomain>()
+
+ /**
+ * The [Job] that awaits the nearest fault in the system.
+ */
+ private var job: Job? = null
+
+ /**
+ * The [Random] instance to use.
+ */
+ private val random: java.util.Random = random.asJavaRandom()
+
+ /**
+ * Enqueue the specified [FailureDomain] to fail some time in the future.
+ */
+ override fun enqueue(domain: FailureDomain) {
+ active += domain
+
+ // Clean up the domain if it finishes
+ domain.scope.coroutineContext[Job]!!.invokeOnCompletion {
+ this@CorrelatedFaultInjector.domain.launch {
+ println("CANCELLED")
+ active -= domain
+
+ if (active.isEmpty()) {
+ job?.cancel()
+ job = null
+ }
+ }
+ }
+
+ if (job != null) {
+ return
+ }
+
+ job = this.domain.launch {
+ while (true) {
+ ensureActive()
+
+ val d = lognvariate(iatScale, iatShape) * 1e3 // Make sure to convert delay to milliseconds
+
+ // Handle long overflow
+ if (simulationContext.clock.millis() + d <= 0) {
+ return@launch
+ }
+
+ delay(d.toLong())
+
+ val n = lognvariate(sizeScale, sizeShape).toInt()
+
+ for (failureDomain in active.shuffled(random).take(n)) {
+ failureDomain.fail()
+ }
+ }
+ }
+ }
+
+ // XXX We should extract this in some common package later on.
+ private fun lognvariate(scale: Double, shape: Double) = exp(scale + shape * random.nextGaussian())
+}
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
index 56706824..3883eb11 100644
--- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
@@ -42,7 +42,7 @@ public class UncorrelatedFaultInjector(private val alpha: Double, private val be
override fun enqueue(domain: FailureDomain) {
domain.scope.launch {
val d = random.weibull(alpha, beta) * 1e3 // Make sure to convert delay to milliseconds
-
+
// Handle long overflow
if (simulationContext.clock.millis() + d <= 0) {
return@launch
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt
index d9a85231..a036a705 100644
--- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt
@@ -48,5 +48,5 @@ public interface ServiceRegistry {
/**
* Register the specified [ServiceKey] in this registry.
*/
- public operator fun <T : Any> set(key: ServiceKey<T>, service: T)
+ public operator fun <T : Any> set(key: ServiceKey<T>, service: T): ServiceRegistry
}
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt
index 91147839..e3fa171d 100644
--- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt
@@ -43,4 +43,6 @@ public class ServiceRegistryImpl : ServiceRegistry {
@Suppress("UNCHECKED_CAST")
return services[key] as T
}
+
+ override fun toString(): String = services.toString()
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 69174f0f..09b6592e 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -30,8 +30,8 @@ import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
+import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
import com.atlarge.opendc.core.failure.FailureDomain
-import com.atlarge.opendc.core.failure.UncorrelatedFaultInjector
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader
@@ -39,6 +39,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.xenomachina.argparser.ArgParser
import com.xenomachina.argparser.default
+import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -115,10 +116,14 @@ fun main(args: Array<String>) {
monitor
)
- root.launch {
+ val faultInjectorDomain = root.newDomain(name = "failures")
+ faultInjectorDomain.launch {
chan.receive()
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- val faultInjector = UncorrelatedFaultInjector(alpha = 9.66772, beta = 12.23796)
+ val faultInjector = CorrelatedFaultInjector(faultInjectorDomain,
+ iatScale = -1.39, iatShape = 1.03,
+ sizeScale = 1.88, sizeShape = 1.25
+ )
for (node in bareMetalProvisioner.nodes()) {
faultInjector.enqueue(node.metadata["driver"] as FailureDomain)
}