summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt10
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt5
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt10
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt9
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt12
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt44
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt51
8 files changed, 128 insertions, 15 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 4a40dc9f..9396699a 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -142,7 +142,7 @@ public class SimpleBareMetalDriver(
override suspend fun stop(): Node = withContext(domain.coroutineContext) {
val node = nodeState.value
- if (node.state == NodeState.SHUTOFF) {
+ if (node.state == NodeState.SHUTOFF || node.state == NodeState.ERROR) {
return@withContext node
}
@@ -298,8 +298,12 @@ public class SimpleBareMetalDriver(
get() = domain
override suspend fun fail() {
- serverContext?.cancel(fail = true)
- domain.cancel()
+ try {
+ serverContext?.cancel(fail = true)
+ domain.cancel()
+ } catch (_: CancellationException) {
+ // Ignore if the machine has already failed.
+ }
}
override fun toString(): String = "SimpleBareMetalDriver(node = ${nodeState.value.uid})"
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt
index 105505f2..a54d8df4 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt
@@ -55,6 +55,11 @@ public interface ProvisioningService {
public suspend fun deploy(node: Node, image: Image): Node
/**
+ * Stop the specified [Node] .
+ */
+ public suspend fun stop(node: Node): Node
+
+ /**
* The service key of this service.
*/
companion object Key : AbstractServiceKey<ProvisioningService>(UUID.randomUUID(), "provisioner")
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
index a7e143aa..f6b236ae 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
@@ -28,6 +28,7 @@ import com.atlarge.odcsim.Domain
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
+import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.withContext
/**
@@ -57,4 +58,13 @@ public class SimpleProvisioningService(val domain: Domain) : ProvisioningService
val newNode = driver.reboot()
return@withContext newNode
}
+
+ override suspend fun stop(node: Node): Node = withContext(domain.coroutineContext) {
+ val driver = nodes[node]!!
+ try {
+ driver.stop()
+ } catch (e: CancellationException) {
+ node
+ }
+ }
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 76368080..3086f4e6 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -67,7 +67,7 @@ class SimpleVirtDriver(
/**
* The [Server] on which this hypervisor runs.
*/
- private val server: Server
+ val server: Server
get() = hostContext.server
/**
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 156521db..b8966275 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -1,6 +1,7 @@
package com.atlarge.opendc.compute.virt.service
import com.atlarge.odcsim.SimulationContext
+import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerEvent
@@ -18,6 +19,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
@@ -52,6 +54,8 @@ class SimpleVirtProvisioningService(
*/
private val activeImages: MutableSet<ImageView> = mutableSetOf()
+ override val hypervisorEvents: Flow<HypervisorEvent> = EventFlow()
+
init {
launch {
val provisionedNodes = provisioningService.nodes()
@@ -84,6 +88,11 @@ class SimpleVirtProvisioningService(
}
}
+ override suspend fun terminate() {
+ val provisionedNodes = provisioningService.nodes()
+ provisionedNodes.forEach { node -> provisioningService.stop(node) }
+ }
+
private var call: Job? = null
private fun requestCycle() {
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
index a3ade2fb..fb200f88 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
@@ -3,8 +3,10 @@ package com.atlarge.opendc.compute.virt.service
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
+import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
+import kotlinx.coroutines.flow.Flow
/**
* A service for VM provisioning on a cloud.
@@ -13,6 +15,11 @@ interface VirtProvisioningService {
val allocationPolicy: AllocationPolicy
/**
+ * The events emitted by the hypervisors.
+ */
+ public val hypervisorEvents: Flow<HypervisorEvent>
+
+ /**
* Obtain the active hypervisors for this provisioner.
*/
public suspend fun drivers(): Set<VirtDriver>
@@ -25,4 +32,9 @@ interface VirtProvisioningService {
* @param flavor The flavor of the machine instance to run this [image] on.
*/
public suspend fun deploy(name: String, image: Image, flavor: Flavor): Server
+
+ /**
+ * Terminate the provisioning service releasing all the leased bare-metal machines.
+ */
+ public suspend fun terminate()
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index 36da7703..120c4f81 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -4,6 +4,7 @@ import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
import kotlinx.coroutines.flow.first
import java.io.BufferedWriter
import java.io.Closeable
@@ -13,32 +14,59 @@ class Sc20Monitor(
destination: String
) : Closeable {
private val outputFile = BufferedWriter(FileWriter(destination))
- private var failed: Int = 0
+ private var failedInSlice: Int = 0
+ private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
init {
- outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n")
+ outputFile.write("time,duration,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n")
}
- suspend fun stateChanged(server: Server) {
- println("[${simulationContext.clock.millis()}] ${server.uid} ${server.state}")
+ suspend fun onVmStateChanged(server: Server) {
if (server.state == ServerState.ERROR) {
- failed++
+ failedInSlice++
}
}
+ suspend fun serverStateChanged(driver: VirtDriver, server: Server) {
+ if ((server.state == ServerState.SHUTOFF || server.state == ServerState.ERROR) &&
+ lastServerStates.containsKey(server)
+ ) {
+ val duration = simulationContext.clock.millis() - lastServerStates[server]!!.second
+ onSliceFinish(
+ simulationContext.clock.millis(),
+ 0,
+ 0,
+ 0,
+ server,
+ duration
+ )
+ }
+
+ println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}")
+
+ lastServerStates[server] = Pair(server.state, simulationContext.clock.millis())
+ }
+
suspend fun onSliceFinish(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
numberOfDeployedImages: Int,
- hostServer: Server
+ hostServer: Server,
+ duration: Long = 5 * 60 * 1000L
) {
+ lastServerStates.remove(hostServer)
+
// Assume for now that the host is not virtualized and measure the current power draw
val driver = hostServer.services[BareMetalDriver.Key]
val usage = driver.usage.first()
val powerDraw = driver.powerDraw.first()
-
- outputFile.write("$time,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw,$failed")
+ val failed = if (failedInSlice > 0) {
+ failedInSlice.also { failedInSlice = 0 }
+ } else {
+ 0
+ }
+ outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw,$failed")
outputFile.newLine()
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 639c3aef..ed971df5 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -28,10 +28,13 @@ import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
+import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerEvent
+import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.metal.NODE_CLUSTER
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.HypervisorEvent
+import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
@@ -141,12 +144,35 @@ fun main(args: Array<String>) {
// Wait for the hypervisors to be spawned
delay(10)
+ val hypervisors = scheduler.drivers()
+ var availableHypervisors = hypervisors.size
+
// Monitor hypervisor events
- for (hypervisor in scheduler.drivers()) {
+ for (hypervisor in hypervisors) {
+ // TODO Do not expose VirtDriver directly but use Hypervisor class.
+ monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server)
+ hypervisor.server.events
+ .onEach { event ->
+ when (event) {
+ is ServerEvent.StateChanged -> {
+ monitor.serverStateChanged(hypervisor, event.server)
+
+ if (event.server.state == ServerState.ERROR)
+ availableHypervisors -= 1
+ }
+ }
+ }
+ .launchIn(this)
hypervisor.events
.onEach { event ->
when (event) {
- is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, event.numberOfDeployedImages, event.hostServer)
+ is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(
+ simulationContext.clock.millis(),
+ event.requestedBurst,
+ event.grantedBurst,
+ event.numberOfDeployedImages,
+ event.hostServer
+ )
else -> println(event)
}
}
@@ -164,6 +190,9 @@ fun main(args: Array<String>) {
}
}
+ val running = mutableSetOf<Server>()
+ val finish = Channel<Unit>(Channel.RENDEZVOUS)
+
val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList())
while (reader.hasNext()) {
val (time, workload) = reader.next()
@@ -174,11 +203,27 @@ fun main(args: Array<String>) {
workload.image.name, workload.image,
Flavor(workload.image.cores, workload.image.requiredMemory)
)
+ running += server
// Monitor server events
- server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect()
+ server.events
+ .onEach {
+ if (it is ServerEvent.StateChanged)
+ monitor.onVmStateChanged(it.server)
+
+ // Detect whether the VM has finished running
+ if (it.server.state == ServerState.ERROR || it.server.state == ServerState.SHUTOFF) {
+ running -= server
+
+ if (running.isEmpty() && (!reader.hasNext() || availableHypervisors == 0))
+ finish.send(Unit)
+ }
+ }
+ .collect()
}
}
+ finish.receive()
+ scheduler.terminate()
println(simulationContext.clock.instant())
}