summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gradle/libs.versions.toml2
-rw-r--r--opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt2
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt2
-rw-r--r--opendc-compute/opendc-compute-simulator/build.gradle.kts2
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt37
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt (renamed from opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/FaultInjector.kt)15
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt65
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt (renamed from opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/UncorrelatedFaultInjector.kt)54
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt (renamed from opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/FailureDomain.kt)35
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt (renamed from opendc-simulator/opendc-simulator-failures/build.gradle.kts)19
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt103
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt111
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt97
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt17
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt65
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt2
-rw-r--r--opendc-experiments/opendc-experiments-energy21/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt3
-rw-r--r--opendc-experiments/opendc-experiments-radice/build.gradle.kts1
-rw-r--r--opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/CorrelatedFaultInjector.kt129
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt20
-rw-r--r--settings.gradle.kts1
23 files changed, 436 insertions, 348 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 76846104..4e2fc777 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -2,6 +2,7 @@
classgraph = "4.8.115"
clikt = "3.2.0"
config = "1.4.1"
+commons-math3 = "3.6.1"
hadoop = "3.3.1"
jackson = "2.12.5"
junit-jupiter = "5.7.2"
@@ -71,3 +72,4 @@ kotlinx-benchmark-runtime-jvm = { module = "org.jetbrains.kotlinx:kotlinx-benchm
classgraph = { module = "io.github.classgraph:classgraph", version.ref = "classgraph" }
hadoop-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "hadoop" }
hadoop-mapreduce-client-core = { module = "org.apache.hadoop:hadoop-mapreduce-client-core", version.ref = "hadoop" }
+commons-math3 = { module = "org.apache.commons:commons-math3", version.ref = "commons-math3" }
diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
index baa1ba2f..577fbc73 100644
--- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
+++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
@@ -45,7 +45,7 @@ public interface ComputeClient : AutoCloseable {
*
* @param name The name of the flavor.
* @param cpuCount The amount of CPU cores for this flavor.
- * @param memorySize The size of the memory.
+ * @param memorySize The size of the memory in MB.
* @param labels The identifying labels of the image.
* @param meta The non-identifying meta-data of the image.
*/
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
index 5632a55e..fc092a3f 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
@@ -25,7 +25,7 @@ package org.opendc.compute.service.driver
/**
* Describes the static machine properties of the host.
*
- * @property vcpuCount The number of logical processing cores available for this host.
+ * @property cpuCount The number of logical processing cores available for this host.
* @property memorySize The amount of memory available for this host in MB.
*/
public data class HostModel(public val cpuCount: Int, public val memorySize: Long)
diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts
index c5a9e668..cad051e6 100644
--- a/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -33,7 +33,7 @@ dependencies {
api(platform(projects.opendcPlatform))
api(projects.opendcCompute.opendcComputeService)
api(projects.opendcSimulator.opendcSimulatorCompute)
- api(projects.opendcSimulator.opendcSimulatorFailures)
+ api(libs.commons.math3)
implementation(projects.opendcUtils)
implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 213d20ee..a1cc3390 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -43,7 +43,6 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.compute.power.SimplePowerDriver
-import org.opendc.simulator.failures.FailureDomain
import org.opendc.simulator.resources.SimResourceInterpreter
import java.util.*
import kotlin.coroutines.CoroutineContext
@@ -66,11 +65,11 @@ public class SimHost(
powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
interferenceDomain: VmInterferenceDomain? = null
-) : Host, FailureDomain, AutoCloseable {
+) : Host, AutoCloseable {
/**
* The [CoroutineScope] of the host bounded by the lifecycle of the host.
*/
- override val scope: CoroutineScope = CoroutineScope(context + Job())
+ private val scope: CoroutineScope = CoroutineScope(context + Job())
/**
* The clock instance used by the host.
@@ -347,6 +346,22 @@ public class SimHost(
override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
+ public suspend fun fail() {
+ reportTime()
+ _state = HostState.DOWN
+ for (guest in guests.values) {
+ guest.fail()
+ }
+ }
+
+ public suspend fun recover() {
+ reportTime()
+ _state = HostState.UP
+ for (guest in guests.values) {
+ guest.start()
+ }
+ }
+
/**
* Convert flavor to machine model.
*/
@@ -369,22 +384,6 @@ public class SimHost(
listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
}
- override suspend fun fail() {
- reportTime()
- _state = HostState.DOWN
- for (guest in guests.values) {
- guest.fail()
- }
- }
-
- override suspend fun recover() {
- reportTime()
- _state = HostState.UP
- for (guest in guests.values) {
- guest.start()
- }
- }
-
/**
* A virtual machine instance that the driver manages.
*/
diff --git a/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/FaultInjector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt
index a866260c..258ccc89 100644
--- a/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/FaultInjector.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,14 +20,17 @@
* SOFTWARE.
*/
-package org.opendc.simulator.failures
+package org.opendc.compute.simulator.failure
+
+import org.opendc.compute.simulator.SimHost
+import java.time.Clock
/**
- * An interface for stochastically injecting faults into a running system.
+ * Interface responsible for applying the fault to a host.
*/
-public interface FaultInjector {
+public interface HostFault {
/**
- * Enqueue the specified [FailureDomain] into the queue as candidate for failure injection in the future.
+ * Apply the fault to the specified [victims].
*/
- public fun enqueue(domain: FailureDomain)
+ public suspend fun apply(clock: Clock, victims: List<SimHost>)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt
new file mode 100644
index 00000000..5eff439f
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.failure
+
+import org.apache.commons.math3.distribution.RealDistribution
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.internal.HostFaultInjectorImpl
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * An interface for stochastically injecting faults into a set of hosts.
+ */
+public interface HostFaultInjector : AutoCloseable {
+ /**
+ * Start fault injection.
+ */
+ public fun start()
+
+ /**
+ * Stop fault injection into the system.
+ */
+ public override fun close()
+
+ public companion object {
+ /**
+ * Construct a new [HostFaultInjector].
+ *
+ * @param context The scope to run the fault injector in.
+ * @param clock The [Clock] to keep track of simulation time.
+ * @param hosts The hosts to inject the faults into.
+ * @param iat The inter-arrival time distribution of the failures (in hours).
+ * @param selector The [VictimSelector] to select the host victims.
+ * @param fault The type of [HostFault] to inject.
+ */
+ public operator fun invoke(
+ context: CoroutineContext,
+ clock: Clock,
+ hosts: Set<SimHost>,
+ iat: RealDistribution,
+ selector: VictimSelector,
+ fault: HostFault
+ ): HostFaultInjector = HostFaultInjectorImpl(context, clock, hosts, iat, selector, fault)
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/UncorrelatedFaultInjector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt
index b3bd737e..fc7cebfc 100644
--- a/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/UncorrelatedFaultInjector.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,42 +20,36 @@
* SOFTWARE.
*/
-package org.opendc.simulator.failures
+package org.opendc.compute.simulator.failure
import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
+import org.apache.commons.math3.distribution.RealDistribution
+import org.opendc.compute.simulator.SimHost
import java.time.Clock
-import kotlin.math.ln1p
-import kotlin.math.pow
-import kotlin.random.Random
+import kotlin.math.roundToLong
/**
- * A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are
- * independent.
+ * A type of [HostFault] where the hosts are stopped and recover after some random amount of time.
*/
-public class UncorrelatedFaultInjector(
- private val clock: Clock,
- private val alpha: Double,
- private val beta: Double,
- private val random: Random = Random(0)
-) : FaultInjector {
- /**
- * Enqueue the specified [FailureDomain] to fail some time in the future.
- */
- override fun enqueue(domain: FailureDomain) {
- domain.scope.launch {
- val d = random.weibull(alpha, beta) * 1e3 // Make sure to convert delay to milliseconds
-
- // Handle long overflow
- if (clock.millis() + d <= 0) {
- return@launch
- }
-
- delay(d.toLong())
- domain.fail()
+public class StartStopHostFault(private val duration: RealDistribution) : HostFault {
+ override suspend fun apply(clock: Clock, victims: List<SimHost>) {
+ for (host in victims) {
+ host.fail()
+ }
+
+ val df = (duration.sample() * 1000).roundToLong() // seconds to milliseconds
+
+ // Handle long overflow
+ if (clock.millis() + df <= 0) {
+ return
+ }
+
+ delay(df)
+
+ for (host in victims) {
+ host.recover()
}
}
- // XXX We should extract this in some common package later on.
- private fun Random.weibull(alpha: Double, beta: Double) = (beta * (-ln1p(-nextDouble())).pow(1.0 / alpha))
+ override fun toString(): String = "StartStopHostFault[$duration]"
}
diff --git a/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/FailureDomain.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
index dc3006e8..87903623 100644
--- a/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/FailureDomain.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,26 +20,25 @@
* SOFTWARE.
*/
-package org.opendc.simulator.failures
+package org.opendc.compute.simulator.failure
-import kotlinx.coroutines.CoroutineScope
+import org.apache.commons.math3.distribution.RealDistribution
+import org.opendc.compute.simulator.SimHost
+import kotlin.math.roundToInt
+import kotlin.random.Random
/**
- * A logical or physical component in a computing environment which may fail.
+ * A [VictimSelector] that stochastically selects a set of hosts to be failed.
*/
-public interface FailureDomain {
- /**
- * The lifecycle of the failure domain to which a [FaultInjector] will attach.
- */
- public val scope: CoroutineScope
+public class StochasticVictimSelector(
+ private val size: RealDistribution,
+ private val random: Random = Random(0)
+) : VictimSelector {
- /**
- * Fail the domain externally.
- */
- public suspend fun fail()
+ override fun select(hosts: Set<SimHost>): List<SimHost> {
+ val n = size.sample().roundToInt()
+ return hosts.shuffled(random).take(n)
+ }
- /**
- * Resume the failure domain.
- */
- public suspend fun recover()
+ override fun toString(): String = "StochasticVictimSelector[$size]"
}
diff --git a/opendc-simulator/opendc-simulator-failures/build.gradle.kts b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt
index 57cd0a35..b5610284 100644
--- a/opendc-simulator/opendc-simulator-failures/build.gradle.kts
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,13 +20,16 @@
* SOFTWARE.
*/
-description = "Failure models for OpenDC"
+package org.opendc.compute.simulator.failure
-plugins {
- `kotlin-library-conventions`
-}
+import org.opendc.compute.simulator.SimHost
-dependencies {
- api(platform(projects.opendcPlatform))
- api(libs.kotlinx.coroutines)
+/**
+ * Interface responsible for selecting the victim(s) for fault injection.
+ */
+public interface VictimSelector {
+ /**
+ * Select the hosts from [hosts] where a fault will be injected.
+ */
+ public fun select(hosts: Set<SimHost>): List<SimHost>
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
new file mode 100644
index 00000000..6919b7fd
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+import kotlinx.coroutines.*
+import org.apache.commons.math3.distribution.RealDistribution
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.failure.HostFault
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import org.opendc.compute.simulator.failure.VictimSelector
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.roundToLong
+
+/**
+ * Internal implementation of the [HostFaultInjector] interface.
+ *
+ * @param context The scope to run the fault injector in.
+ * @param clock The [Clock] to keep track of simulation time.
+ * @param hosts The set of hosts to inject faults into.
+ * @param iat The inter-arrival time distribution of the failures (in hours).
+ * @param selector The [VictimSelector] to select the host victims.
+ * @param fault The type of [HostFault] to inject.
+ */
+internal class HostFaultInjectorImpl(
+ private val context: CoroutineContext,
+ private val clock: Clock,
+ private val hosts: Set<SimHost>,
+ private val iat: RealDistribution,
+ private val selector: VictimSelector,
+ private val fault: HostFault
+) : HostFaultInjector {
+ /**
+ * The scope in which the injector runs.
+ */
+ private val scope = CoroutineScope(context + Job())
+
+ /**
+ * The [Job] that awaits the nearest fault in the system.
+ */
+ private var job: Job? = null
+
+ /**
+ * Start the fault injection into the system.
+ */
+ override fun start() {
+ if (job != null) {
+ return
+ }
+
+ job = scope.launch {
+ runInjector()
+ job = null
+ }
+ }
+
+ /**
+ * Run the injection process.
+ */
+ private suspend fun runInjector() {
+ while (true) {
+ // Make sure to convert delay from hours to milliseconds
+ val d = (iat.sample() * 3.6e6).roundToLong()
+
+ // Handle long overflow
+ if (clock.millis() + d <= 0) {
+ return
+ }
+
+ delay(d)
+
+ val victims = selector.select(hosts)
+ fault.apply(clock, victims)
+ }
+ }
+
+ /**
+ * Stop the fault injector.
+ */
+ public override fun close() {
+ scope.cancel()
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt
new file mode 100644
index 00000000..f240a25f
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.failure
+
+import io.mockk.coVerify
+import io.mockk.mockk
+import kotlinx.coroutines.delay
+import org.apache.commons.math3.distribution.LogNormalDistribution
+import org.apache.commons.math3.random.Well19937c
+import org.junit.jupiter.api.Test
+import org.opendc.compute.simulator.SimHost
+import org.opendc.simulator.core.runBlockingSimulation
+import java.time.Clock
+import java.time.Duration
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.ln
+
+/**
+ * Test suite for [HostFaultInjector] class.
+ */
+internal class HostFaultInjectorTest {
+ /**
+ * Simple test case to test that nothing happens when the injector is not started.
+ */
+ @Test
+ fun testInjectorNotStarted() = runBlockingSimulation {
+ val host = mockk<SimHost>(relaxUnitFun = true)
+
+ val injector = createSimpleInjector(coroutineContext, clock, setOf(host))
+
+ coVerify(exactly = 0) { host.fail() }
+ coVerify(exactly = 0) { host.recover() }
+
+ injector.close()
+ }
+
+ /**
+ * Simple test case to test a start stop fault where the machine is stopped and started after some time.
+ */
+ @Test
+ fun testInjectorStopsMachine() = runBlockingSimulation {
+ val host = mockk<SimHost>(relaxUnitFun = true)
+
+ val injector = createSimpleInjector(coroutineContext, clock, setOf(host))
+
+ injector.start()
+
+ delay(Duration.ofDays(55).toMillis())
+
+ injector.close()
+
+ coVerify(exactly = 1) { host.fail() }
+ coVerify(exactly = 1) { host.recover() }
+ }
+
+ /**
+ * Simple test case to test a start stop fault where multiple machines are stopped.
+ */
+ @Test
+ fun testInjectorStopsMultipleMachines() = runBlockingSimulation {
+ val hosts = listOf<SimHost>(
+ mockk(relaxUnitFun = true),
+ mockk(relaxUnitFun = true)
+ )
+
+ val injector = createSimpleInjector(coroutineContext, clock, hosts.toSet())
+
+ injector.start()
+
+ delay(Duration.ofDays(55).toMillis())
+
+ injector.close()
+
+ coVerify(exactly = 1) { hosts[0].fail() }
+ coVerify(exactly = 1) { hosts[1].fail() }
+ coVerify(exactly = 1) { hosts[0].recover() }
+ coVerify(exactly = 1) { hosts[1].recover() }
+ }
+
+ /**
+ * Create a simple start stop fault injector.
+ */
+ private fun createSimpleInjector(context: CoroutineContext, clock: Clock, hosts: Set<SimHost>): HostFaultInjector {
+ val rng = Well19937c(0)
+ val iat = LogNormalDistribution(rng, ln(24 * 7.0), 1.03)
+ val selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25))
+ val fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
+
+ return HostFaultInjector(context, clock, hosts, iat, selector, fault)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index b2330af0..036d0638 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -35,7 +35,6 @@ dependencies {
implementation(projects.opendcTrace.opendcTraceBitbrains)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
- implementation(projects.opendcSimulator.opendcSimulatorFailures)
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
implementation(projects.opendcTelemetry.opendcTelemetryCompute)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 0230409e..8227bca9 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -25,7 +25,8 @@ package org.opendc.experiments.capelin
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
+import org.apache.commons.math3.distribution.LogNormalDistribution
+import org.apache.commons.math3.random.Well19937c
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
@@ -39,6 +40,9 @@ import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
import org.opendc.compute.service.scheduler.weights.RamWeigher
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import org.opendc.compute.simulator.failure.StartStopHostFault
+import org.opendc.compute.simulator.failure.StochasticVictimSelector
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.trace.TraceReader
import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
@@ -46,67 +50,36 @@ import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.failures.CorrelatedFaultInjector
-import org.opendc.simulator.failures.FaultInjector
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Clock
-import kotlin.coroutines.resume
+import kotlin.coroutines.CoroutineContext
import kotlin.math.ln
import kotlin.math.max
import kotlin.random.Random
/**
- * Construct the failure domain for the experiments.
- */
-fun createFailureDomain(
- coroutineScope: CoroutineScope,
- clock: Clock,
- seed: Int,
- failureInterval: Double,
- service: ComputeService,
- chan: Channel<Unit>
-): CoroutineScope {
- val job = coroutineScope.launch {
- chan.receive()
- val random = Random(seed)
- val injectors = mutableMapOf<String, FaultInjector>()
- for (host in service.hosts) {
- val cluster = host.meta["cluster"] as String
- val injector =
- injectors.getOrPut(cluster) {
- createFaultInjector(
- this,
- clock,
- random,
- failureInterval
- )
- }
- injector.enqueue(host as SimHost)
- }
- }
- return CoroutineScope(coroutineScope.coroutineContext + job)
-}
-
-/**
* Obtain the [FaultInjector] to use for the experiments.
*/
fun createFaultInjector(
- coroutineScope: CoroutineScope,
+ context: CoroutineContext,
clock: Clock,
- random: Random,
+ hosts: Set<SimHost>,
+ seed: Int,
failureInterval: Double
-): FaultInjector {
+): HostFaultInjector {
+ val rng = Well19937c(seed)
+
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
// GRID'5000
- return CorrelatedFaultInjector(
- coroutineScope,
+ return HostFaultInjector(
+ context,
clock,
- iatScale = ln(failureInterval), iatShape = 1.03, // Hours
- sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
- dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
- random = random
+ hosts,
+ iat = LogNormalDistribution(rng, ln(failureInterval), 1.03),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
+ fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
)
}
@@ -162,14 +135,22 @@ suspend fun processTrace(
clock: Clock,
reader: TraceReader<SimWorkload>,
scheduler: ComputeService,
- chan: Channel<Unit>,
monitor: ComputeMonitor? = null,
) {
val client = scheduler.newClient()
+ val watcher = object : ServerWatcher {
+ override fun onStateChanged(server: Server, newState: ServerState) {
+ monitor?.onStateChange(clock.millis(), server, newState)
+ }
+ }
+
+ // Create new image for the virtual machine
val image = client.newImage("vm-image")
- var offset = Long.MIN_VALUE
+
try {
coroutineScope {
+ var offset = Long.MIN_VALUE
+
while (reader.hasNext()) {
val entry = reader.next()
@@ -180,9 +161,11 @@ suspend fun processTrace(
// Make sure the trace entries are ordered by submission time
assert(entry.start - offset >= 0) { "Invalid trace order" }
delay(max(0, (entry.start - offset) - clock.millis()))
+
launch {
- chan.send(Unit)
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, offset = -offset + 300001)
+ val workloadOffset = -offset + 300001
+ val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
+
val server = client.newServer(
entry.name,
image,
@@ -193,18 +176,14 @@ suspend fun processTrace(
),
meta = entry.meta + mapOf("workload" to workload)
)
+ server.watch(watcher)
- suspendCancellableCoroutine { cont ->
- server.watch(object : ServerWatcher {
- override fun onStateChanged(server: Server, newState: ServerState) {
- monitor?.onStateChange(clock.millis(), server, newState)
+ // Wait for the server reach its end time
+ val endTime = entry.meta["end-time"] as Long
+ delay(endTime + workloadOffset - clock.millis() + 1)
- if (newState == ServerState.TERMINATED) {
- cont.resume(Unit)
- }
- }
- })
- }
+ // Delete the server after reaching the end-time of the virtual machine
+ server.delete()
}
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 4db04591..82794471 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -25,9 +25,8 @@ package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
+import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
import org.opendc.experiments.capelin.model.CompositeWorkload
@@ -103,7 +102,6 @@ abstract class Portfolio(name: String) : Experiment(name) {
val seeder = Random(repeat.toLong())
val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
- val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements)
val meterProvider = createMeterProvider(clock)
@@ -137,31 +135,30 @@ abstract class Portfolio(name: String) : Experiment(name) {
)
withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler ->
- val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
+ val faultInjector = if (operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
- createFailureDomain(
- this,
+ createFaultInjector(
+ coroutineContext,
clock,
+ scheduler.hosts.map { it as SimHost }.toSet(),
seeder.nextInt(),
operationalPhenomena.failureFrequency,
- scheduler,
- chan
)
} else {
null
}
withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
+ faultInjector?.start()
processTrace(
clock,
trace,
scheduler,
- chan,
monitor
)
}
- failureDomain?.cancel()
+ faultInjector?.close()
monitor.close()
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index aed9a4bb..44cf92a8 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -23,8 +23,6 @@
package org.opendc.experiments.capelin
import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.Channel
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@@ -34,6 +32,7 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
@@ -52,7 +51,7 @@ import java.io.File
import java.util.*
/**
- * An integration test suite for the SC20 experiments.
+ * An integration test suite for the Capelin experiments.
*/
class CapelinIntegrationTest {
/**
@@ -73,9 +72,6 @@ class CapelinIntegrationTest {
*/
@Test
fun testLarge() = runBlockingSimulation {
- val failures = false
- val seed = 0
- val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
@@ -85,31 +81,14 @@ class CapelinIntegrationTest {
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- val failureDomain = if (failures) {
- println("ENABLING failures")
- createFailureDomain(
- this,
- clock,
- seed,
- 24.0 * 7,
- scheduler,
- chan
- )
- } else {
- null
- }
-
withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
scheduler,
- chan,
monitor
)
}
-
- failureDomain?.cancel()
}
val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
@@ -141,12 +120,11 @@ class CapelinIntegrationTest {
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
)
- val traceReader = createTestTraceReader(0.5, seed)
+ val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
val meterProvider = createMeterProvider(clock)
@@ -157,7 +135,6 @@ class CapelinIntegrationTest {
clock,
traceReader,
scheduler,
- chan,
monitor
)
}
@@ -174,9 +151,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(38051879552, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(34888186408, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(971668973, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
+ { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -187,7 +164,6 @@ class CapelinIntegrationTest {
@Test
fun testInterference() = runBlockingSimulation {
val seed = 1
- val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
@@ -209,7 +185,6 @@ class CapelinIntegrationTest {
clock,
traceReader,
scheduler,
- chan,
monitor
)
}
@@ -226,10 +201,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(38051879552, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(34888186408, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(971668973, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(13910814, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
+ { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -239,7 +214,6 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
- val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
@@ -250,27 +224,26 @@ class CapelinIntegrationTest {
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- val failureDomain =
- createFailureDomain(
- this,
+ val faultInjector =
+ createFaultInjector(
+ coroutineContext,
clock,
+ scheduler.hosts.map { it as SimHost }.toSet(),
seed,
24.0 * 7,
- scheduler,
- chan
)
withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
+ faultInjector.start()
processTrace(
clock,
traceReader,
scheduler,
- chan,
monitor
)
}
- failureDomain.cancel()
+ faultInjector.close()
}
val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
@@ -284,9 +257,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(25412073109, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(23695061858, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(368502468, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } },
+ { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
index 53b3c2d7..5642003d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
@@ -1,3 +1,3 @@
ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
-A01;A01;8;3.2;64;1;64;8
+A01;A01;8;3.2;128;1;128;8
diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
index 40ac2967..cc58e5f1 100644
--- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
@@ -33,7 +33,6 @@ dependencies {
api(projects.opendcHarness.opendcHarnessApi)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
- implementation(projects.opendcSimulator.opendcSimulatorFailures)
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcExperiments.opendcExperimentsCapelin)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
index 02aaab3c..d9194969 100644
--- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
+++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
@@ -26,7 +26,6 @@ import com.typesafe.config.ConfigFactory
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import mu.KotlinLogging
import org.opendc.compute.service.ComputeService
@@ -81,7 +80,6 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
private val powerModel by anyOf(PowerModelType.LINEAR, PowerModelType.CUBIC, PowerModelType.INTERPOLATION)
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
- val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
weighers = listOf(),
@@ -98,7 +96,6 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
clock,
trace,
scheduler,
- chan,
monitor
)
}
diff --git a/opendc-experiments/opendc-experiments-radice/build.gradle.kts b/opendc-experiments/opendc-experiments-radice/build.gradle.kts
index c1515165..0c716183 100644
--- a/opendc-experiments/opendc-experiments-radice/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-radice/build.gradle.kts
@@ -34,7 +34,6 @@ dependencies {
implementation(projects.opendcFormat)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
- implementation(projects.opendcSimulator.opendcSimulatorFailures)
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
diff --git a/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/CorrelatedFaultInjector.kt b/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/CorrelatedFaultInjector.kt
deleted file mode 100644
index 0e15f338..00000000
--- a/opendc-simulator/opendc-simulator-failures/src/main/kotlin/org/opendc/simulator/failures/CorrelatedFaultInjector.kt
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.failures
-
-import kotlinx.coroutines.*
-import java.time.Clock
-import kotlin.math.exp
-import kotlin.math.max
-import kotlin.random.Random
-import kotlin.random.asJavaRandom
-
-/**
- * A [FaultInjector] that injects fault in the system which are correlated to each other. Failures do not occur in
- * isolation, but will trigger other faults.
- */
-public class CorrelatedFaultInjector(
- private val coroutineScope: CoroutineScope,
- private val clock: Clock,
- private val iatScale: Double,
- private val iatShape: Double,
- private val sizeScale: Double,
- private val sizeShape: Double,
- private val dScale: Double,
- private val dShape: Double,
- random: Random = Random(0)
-) : FaultInjector {
- /**
- * The active failure domains that have been registered.
- */
- private val active = mutableSetOf<FailureDomain>()
-
- /**
- * The [Job] that awaits the nearest fault in the system.
- */
- private var job: Job? = null
-
- /**
- * The [Random] instance to use.
- */
- private val random: java.util.Random = random.asJavaRandom()
-
- /**
- * Enqueue the specified [FailureDomain] to fail some time in the future.
- */
- override fun enqueue(domain: FailureDomain) {
- active += domain
-
- // Clean up the domain if it finishes
- domain.scope.coroutineContext[Job]!!.invokeOnCompletion {
- this@CorrelatedFaultInjector.coroutineScope.launch {
- active -= domain
-
- if (active.isEmpty()) {
- job?.cancel()
- job = null
- }
- }
- }
-
- if (job != null) {
- return
- }
-
- job = this.coroutineScope.launch {
- while (active.isNotEmpty()) {
- ensureActive()
-
- // Make sure to convert delay from hours to milliseconds
- val d = lognvariate(iatScale, iatShape) * 3.6e6
-
- // Handle long overflow
- if (clock.millis() + d <= 0) {
- return@launch
- }
-
- delay(d.toLong())
-
- val n = lognvariate(sizeScale, sizeShape).toInt()
- val targets = active.shuffled(random).take(n)
-
- for (failureDomain in targets) {
- active -= failureDomain
- failureDomain.fail()
- }
-
- val df = max(lognvariate(dScale, dShape) * 6e4, 15 * 6e4)
-
- // Handle long overflow
- if (clock.millis() + df <= 0) {
- return@launch
- }
-
- delay(df.toLong())
-
- for (failureDomain in targets) {
- failureDomain.recover()
-
- // Re-enqueue machine to be failed
- enqueue(failureDomain)
- }
- }
-
- job = null
- }
- }
-
- // XXX We should extract this in some common package later on.
- private fun lognvariate(scale: Double, shape: Double) = exp(scale + shape * random.nextGaussian())
-}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 5d481270..b565e90d 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -30,8 +30,8 @@ import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
+import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.env.MachineDef
@@ -188,8 +188,6 @@ class RunnerCli : CliktCommand(name = "runner") {
val seeder = Random(seed)
- val chan = Channel<Unit>(Channel.CONFLATED)
-
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
.setClock(clock.toOtelClock())
@@ -207,31 +205,31 @@ class RunnerCli : CliktCommand(name = "runner") {
val failureFrequency = if (operational.failuresEnabled) 24.0 * 7 else 0.0
withComputeService(clock, meterProvider, environment, allocationPolicy, interferenceModel) { scheduler ->
- val failureDomain = if (failureFrequency > 0) {
+ val faultInjector = if (failureFrequency > 0) {
logger.debug { "ENABLING failures" }
- createFailureDomain(
- this,
+ createFaultInjector(
+ coroutineContext,
clock,
+ scheduler.hosts.map { it as SimHost }.toSet(),
seeder.nextInt(),
failureFrequency,
- scheduler,
- chan
)
} else {
null
}
withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
+ faultInjector?.start()
+
processTrace(
clock,
trace,
scheduler,
- chan,
monitor
)
- }
- failureDomain?.cancel()
+ faultInjector?.close()
+ }
}
val monitorResults = collectServiceMetrics(clock.millis(), metricProducer)
diff --git a/settings.gradle.kts b/settings.gradle.kts
index cee8887b..427cdb52 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -42,7 +42,6 @@ include(":opendc-simulator:opendc-simulator-resources")
include(":opendc-simulator:opendc-simulator-power")
include(":opendc-simulator:opendc-simulator-network")
include(":opendc-simulator:opendc-simulator-compute")
-include(":opendc-simulator:opendc-simulator-failures")
include(":opendc-telemetry:opendc-telemetry-api")
include(":opendc-telemetry:opendc-telemetry-sdk")
include(":opendc-telemetry:opendc-telemetry-compute")