summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt32
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt23
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt37
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt29
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt5
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt17
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt33
8 files changed, 123 insertions, 55 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 9396699a..37ae9eb5 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -49,7 +49,6 @@ import com.atlarge.opendc.core.services.ServiceRegistry
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
@@ -182,6 +181,10 @@ public class SimpleBareMetalDriver(
private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext {
private var finalized: Boolean = false
+ // A state in which the machine is still available, but does not run any of the work requested by the
+ // image
+ var unavailable = false
+
override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus
override val server: Server
@@ -266,7 +269,9 @@ public class SimpleBareMetalDriver(
}
}
- usageState.value = totalUsage / cpus.size
+ if (!unavailable) {
+ usageState.value = totalUsage / cpus.size
+ }
try {
delay(duration)
@@ -276,7 +281,7 @@ public class SimpleBareMetalDriver(
}
val end = simulationContext.clock.millis()
- // Flush the load if the do not receive a new run call for the same timestamp
+ // Flush the load if they do not receive a new run call for the same timestamp
flush = domain.launch(job) {
delay(1)
usageState.value = 0.0
@@ -285,6 +290,10 @@ public class SimpleBareMetalDriver(
flush = null
}
+ if (unavailable) {
+ return
+ }
+
// Write back the remaining burst time
for (i in 0 until min(cpus.size, burst.size)) {
val usage = min(limit[i], cpus[i].frequency)
@@ -298,12 +307,17 @@ public class SimpleBareMetalDriver(
get() = domain
override suspend fun fail() {
- try {
- serverContext?.cancel(fail = true)
- domain.cancel()
- } catch (_: CancellationException) {
- // Ignore if the machine has already failed.
- }
+ serverContext?.unavailable = true
+
+ val server = nodeState.value.server?.copy(state = ServerState.ERROR)
+ setNode(nodeState.value.copy(state = NodeState.ERROR, server = server))
+ }
+
+ override suspend fun recover() {
+ serverContext?.unavailable = false
+
+ val server = nodeState.value.server?.copy(state = ServerState.ACTIVE)
+ setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server))
}
override fun toString(): String = "SimpleBareMetalDriver(node = ${nodeState.value.uid})"
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
index c21b002d..bd395f0d 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
@@ -50,7 +50,7 @@ object HypervisorImage : Image {
try {
suspendCancellableCoroutine<Unit> {}
} finally {
- driver.eventFlow.close()
+ driver.cancel()
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index b4626def..c21a9fc0 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -65,8 +65,8 @@ import kotlin.math.min
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
class SimpleVirtDriver(
private val hostContext: ServerContext,
- private val coroutineScope: CoroutineScope
-) : VirtDriver {
+ scope: CoroutineScope
+) : VirtDriver, CoroutineScope by scope {
/**
* The [Server] on which this hypervisor runs.
*/
@@ -98,7 +98,7 @@ class SimpleVirtDriver(
it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
performanceModel?.computeIntersectingItems(imagesRunning)
}
- }.launchIn(coroutineScope)
+ }.launchIn(this)
}
override suspend fun spawn(
@@ -123,6 +123,10 @@ class SimpleVirtDriver(
return server
}
+ internal fun cancel() {
+ eventFlow.close()
+ }
+
/**
* A flag to indicate the driver is stopped.
*/
@@ -141,7 +145,7 @@ class SimpleVirtDriver(
/**
* Schedule the vCPUs on the physical CPUs.
*/
- private suspend fun reschedule() {
+ private fun reschedule() {
flush()
// Do not schedule a call if there is no work to schedule or the driver stopped.
@@ -149,7 +153,7 @@ class SimpleVirtDriver(
return
}
- val call = coroutineScope.launch {
+ val call = launch {
val start = simulationContext.clock.millis()
val vms = activeVms.toSet()
@@ -219,7 +223,7 @@ class SimpleVirtDriver(
val fraction = req.allocatedUsage / totalUsage
// Derive the burst that was allocated to this vCPU
- val allocatedBurst = ceil(duration * req.allocatedUsage).toLong()
+ val allocatedBurst = ceil(totalBurst * fraction).toLong()
// Compute the burst time that the VM was actually granted
val grantedBurst = (performanceScore * (allocatedBurst - ceil(totalRemainder * fraction))).toLong()
@@ -244,6 +248,9 @@ class SimpleVirtDriver(
server
)
)
+
+ // Make sure we reschedule the remaining amount of work (if we did not obtain the entire request)
+ reschedule()
}
this.call = call
}
@@ -286,7 +293,7 @@ class SimpleVirtDriver(
var chan = Channel<Unit>(Channel.RENDEZVOUS)
private var initialized: Boolean = false
- internal val job: Job = coroutineScope.launch {
+ internal val job: Job = launch {
delay(1) // TODO Introduce boot time
init()
try {
@@ -331,8 +338,8 @@ class SimpleVirtDriver(
server = server.copy(state = serverState)
availableMemory += server.flavor.memorySize
vms.remove(this)
- events.close()
eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory))
+ events.close()
}
override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index c7347783..85bdc438 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -122,7 +122,7 @@ class SimpleVirtProvisioningService(
val requiredMemory = (imageInstance.image as VmImage).requiredMemory
val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) ?: break
try {
- log.info("Spawning ${imageInstance.image} on ${selectedHv.server}")
+ log.info("Spawning ${imageInstance.image} on ${selectedHv.server} ${availableHypervisors.size}")
incomingImages -= imageInstance
// Speculatively update the hypervisor view information to prevent other images in the queue from
@@ -139,6 +139,19 @@ class SimpleVirtProvisioningService(
imageInstance.server = server
imageInstance.continuation.resume(server)
activeImages += imageInstance
+
+ server.events
+ .onEach { event ->
+ when (event) {
+ is ServerEvent.StateChanged -> {
+ if (event.server.state == ServerState.SHUTOFF) {
+ activeImages -= imageInstance
+ selectedHv.provisionedCores -= server.flavor.cpuCount
+ }
+ }
+ }
+ }
+ .launchIn(this)
} catch (e: InsufficientMemoryOnServerException) {
println("Unable to deploy image due to insufficient memory")
@@ -152,18 +165,22 @@ class SimpleVirtProvisioningService(
private fun stateChanged(server: Server) {
when (server.state) {
ServerState.ACTIVE -> {
- val hv = HypervisorView(
- server.uid,
- server,
- 0,
- server.flavor.memorySize,
- 0
- )
- hypervisors[server] = hv
+ if (server in hypervisors) {
+ // Corner case for when the hypervisor already exists
+ availableHypervisors += hypervisors.getValue(server)
+ } else {
+ val hv = HypervisorView(
+ server.uid,
+ server,
+ 0,
+ server.flavor.memorySize,
+ 0
+ )
+ hypervisors[server] = hv
+ }
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val hv = hypervisors[server] ?: return
- hv.provisionedCores -= server.flavor.cpuCount
availableHypervisors -= hv
requestCycle()
}
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
index c5189764..2904fbec 100644
--- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
@@ -44,6 +44,8 @@ public class CorrelatedFaultInjector(
private val iatShape: Double,
private val sizeScale: Double,
private val sizeShape: Double,
+ private val dScale: Double,
+ private val dShape: Double,
random: Random = Random(0)
) : FaultInjector {
/**
@@ -84,11 +86,11 @@ public class CorrelatedFaultInjector(
}
job = this.domain.launch {
- while (true) {
+ while (active.isNotEmpty()) {
ensureActive()
// Make sure to convert delay from hours to milliseconds
- val d = lognvariate(iatScale, iatShape) * 3600 * 1e6
+ val d = lognvariate(iatScale, iatShape) * 3.6e6
// Handle long overflow
if (simulationContext.clock.millis() + d <= 0) {
@@ -98,10 +100,31 @@ public class CorrelatedFaultInjector(
delay(d.toLong())
val n = lognvariate(sizeScale, sizeShape).toInt()
- for (failureDomain in active.shuffled(random).take(n)) {
+ val targets = active.shuffled(random).take(n)
+
+ for (failureDomain in targets) {
+ active -= failureDomain
failureDomain.fail()
}
+
+ val df = lognvariate(dScale, dShape) * 6e4
+
+ // Handle long overflow
+ if (simulationContext.clock.millis() + df <= 0) {
+ return@launch
+ }
+
+ delay(df.toLong())
+
+ for (failureDomain in targets) {
+ failureDomain.recover()
+
+ // Re-enqueue machine to be failed
+ enqueue(failureDomain)
+ }
}
+
+ job = null
}
}
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt
index 91ca9b83..d56df3c9 100644
--- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt
@@ -39,4 +39,9 @@ public interface FailureDomain {
* Fail the domain externally.
*/
public suspend fun fail()
+
+ /**
+ * Resume the failure domain.
+ */
+ public suspend fun recover()
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index 120c4f81..212b1bfb 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -14,18 +14,13 @@ class Sc20Monitor(
destination: String
) : Closeable {
private val outputFile = BufferedWriter(FileWriter(destination))
- private var failedInSlice: Int = 0
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
init {
- outputFile.write("time,duration,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n")
+ outputFile.write("time,duration,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostState,hostUsage,powerDraw,failedVms\n")
}
- suspend fun onVmStateChanged(server: Server) {
- if (server.state == ServerState.ERROR) {
- failedInSlice++
- }
- }
+ suspend fun onVmStateChanged(server: Server) {}
suspend fun serverStateChanged(driver: VirtDriver, server: Server) {
if ((server.state == ServerState.SHUTOFF || server.state == ServerState.ERROR) &&
@@ -61,12 +56,8 @@ class Sc20Monitor(
val driver = hostServer.services[BareMetalDriver.Key]
val usage = driver.usage.first()
val powerDraw = driver.powerDraw.first()
- val failed = if (failedInSlice > 0) {
- failedInSlice.also { failedInSlice = 0 }
- } else {
- 0
- }
- outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw,$failed")
+
+ outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},${hostServer.state},$usage,$powerDraw")
outputFile.newLine()
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index cc403e6e..ca7e31ea 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -52,6 +52,7 @@ import com.fasterxml.jackson.module.kotlin.readValue
import com.xenomachina.argparser.ArgParser
import com.xenomachina.argparser.default
import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
@@ -63,6 +64,7 @@ import java.io.File
import java.io.FileReader
import java.util.ServiceLoader
import kotlin.math.max
+import kotlin.random.Random
class ExperimentParameters(parser: ArgParser) {
val traceDirectory by parser.storing("path to the trace directory")
@@ -100,11 +102,14 @@ class ExperimentParameters(parser: ArgParser) {
/**
* Obtain the [FaultInjector] to use for the experiments.
*/
-fun createFaultInjector(domain: Domain): FaultInjector {
+fun createFaultInjector(domain: Domain, random: Random): FaultInjector {
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
return CorrelatedFaultInjector(domain,
- iatScale = -1.39, iatShape = 1.03,
- sizeScale = 1.88, sizeShape = 1.25
+ iatScale = -1.39, iatShape = 1.03, // Hours
+ sizeScale = 1.88, sizeShape = 1.25,
+ dScale = 9.51, dShape = 3.21, // Minutes
+ random = random
)
}
@@ -202,18 +207,22 @@ fun main(args: Array<String>) {
.launchIn(this)
}
- if (failures) {
- println("ENABLE Failures")
- root.newDomain(name = "failures").launch {
+ val failureDomain = if (failures) {
+ println("ENABLING failures")
+ val domain = root.newDomain(name = "failures")
+ domain.launch {
chan.receive()
+ val random = Random(0)
val injectors = mutableMapOf<String, FaultInjector>()
-
for (node in bareMetalProvisioner.nodes()) {
val cluster = node.metadata[NODE_CLUSTER] as String
- val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain) }
+ val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) }
injector.enqueue(node.metadata["driver"] as FailureDomain)
}
}
+ domain
+ } else {
+ null
}
val running = mutableSetOf<Server>()
@@ -237,11 +246,12 @@ fun main(args: Array<String>) {
monitor.onVmStateChanged(it.server)
// Detect whether the VM has finished running
- if (it.server.state == ServerState.ERROR || it.server.state == ServerState.SHUTOFF) {
+ if (it.server.state == ServerState.SHUTOFF) {
running -= server
+ }
- if (running.isEmpty() && (!reader.hasNext() || availableHypervisors == 0))
- finish.send(Unit)
+ if (running.isEmpty() && !reader.hasNext()) {
+ finish.send(Unit)
}
}
.collect()
@@ -250,6 +260,7 @@ fun main(args: Array<String>) {
finish.receive()
scheduler.terminate()
+ failureDomain?.cancel()
println(simulationContext.clock.instant())
}