summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-sc20
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-30 17:19:59 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-30 17:19:59 +0200
commitbbba1d507f27b19a74e5c87e0e9e7250fb796957 (patch)
tree568d54bd76a189a1a46f5b392d26db39cd6b87c3 /opendc/opendc-experiments-sc20
parent620f194c53d950a37f78577f4aacfd7c0c06bb9a (diff)
parent1699ae13574dddb680dc8176386817fa05816820 (diff)
Merge branch '2.x-monitor-state-changes' into '2.x'
Monitor state changes in the Sc20Monitor See merge request opendc/opendc-simulator!47
Diffstat (limited to 'opendc/opendc-experiments-sc20')
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt44
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt51
2 files changed, 84 insertions, 11 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index 36da7703..120c4f81 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -4,6 +4,7 @@ import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
import kotlinx.coroutines.flow.first
import java.io.BufferedWriter
import java.io.Closeable
@@ -13,32 +14,59 @@ class Sc20Monitor(
destination: String
) : Closeable {
private val outputFile = BufferedWriter(FileWriter(destination))
- private var failed: Int = 0
+ private var failedInSlice: Int = 0
+ private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
init {
- outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n")
+ outputFile.write("time,duration,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n")
}
- suspend fun stateChanged(server: Server) {
- println("[${simulationContext.clock.millis()}] ${server.uid} ${server.state}")
+ suspend fun onVmStateChanged(server: Server) {
if (server.state == ServerState.ERROR) {
- failed++
+ failedInSlice++
}
}
+ suspend fun serverStateChanged(driver: VirtDriver, server: Server) {
+ if ((server.state == ServerState.SHUTOFF || server.state == ServerState.ERROR) &&
+ lastServerStates.containsKey(server)
+ ) {
+ val duration = simulationContext.clock.millis() - lastServerStates[server]!!.second
+ onSliceFinish(
+ simulationContext.clock.millis(),
+ 0,
+ 0,
+ 0,
+ server,
+ duration
+ )
+ }
+
+ println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}")
+
+ lastServerStates[server] = Pair(server.state, simulationContext.clock.millis())
+ }
+
suspend fun onSliceFinish(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
numberOfDeployedImages: Int,
- hostServer: Server
+ hostServer: Server,
+ duration: Long = 5 * 60 * 1000L
) {
+ lastServerStates.remove(hostServer)
+
// Assume for now that the host is not virtualized and measure the current power draw
val driver = hostServer.services[BareMetalDriver.Key]
val usage = driver.usage.first()
val powerDraw = driver.powerDraw.first()
-
- outputFile.write("$time,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw,$failed")
+ val failed = if (failedInSlice > 0) {
+ failedInSlice.also { failedInSlice = 0 }
+ } else {
+ 0
+ }
+ outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw,$failed")
outputFile.newLine()
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 639c3aef..ed971df5 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -28,10 +28,13 @@ import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
+import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerEvent
+import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.metal.NODE_CLUSTER
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.HypervisorEvent
+import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
@@ -141,12 +144,35 @@ fun main(args: Array<String>) {
// Wait for the hypervisors to be spawned
delay(10)
+ val hypervisors = scheduler.drivers()
+ var availableHypervisors = hypervisors.size
+
// Monitor hypervisor events
- for (hypervisor in scheduler.drivers()) {
+ for (hypervisor in hypervisors) {
+ // TODO Do not expose VirtDriver directly but use Hypervisor class.
+ monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server)
+ hypervisor.server.events
+ .onEach { event ->
+ when (event) {
+ is ServerEvent.StateChanged -> {
+ monitor.serverStateChanged(hypervisor, event.server)
+
+ if (event.server.state == ServerState.ERROR)
+ availableHypervisors -= 1
+ }
+ }
+ }
+ .launchIn(this)
hypervisor.events
.onEach { event ->
when (event) {
- is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, event.numberOfDeployedImages, event.hostServer)
+ is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(
+ simulationContext.clock.millis(),
+ event.requestedBurst,
+ event.grantedBurst,
+ event.numberOfDeployedImages,
+ event.hostServer
+ )
else -> println(event)
}
}
@@ -164,6 +190,9 @@ fun main(args: Array<String>) {
}
}
+ val running = mutableSetOf<Server>()
+ val finish = Channel<Unit>(Channel.RENDEZVOUS)
+
val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList())
while (reader.hasNext()) {
val (time, workload) = reader.next()
@@ -174,11 +203,27 @@ fun main(args: Array<String>) {
workload.image.name, workload.image,
Flavor(workload.image.cores, workload.image.requiredMemory)
)
+ running += server
// Monitor server events
- server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect()
+ server.events
+ .onEach {
+ if (it is ServerEvent.StateChanged)
+ monitor.onVmStateChanged(it.server)
+
+ // Detect whether the VM has finished running
+ if (it.server.state == ServerState.ERROR || it.server.state == ServerState.SHUTOFF) {
+ running -= server
+
+ if (running.isEmpty() && (!reader.hasNext() || availableHypervisors == 0))
+ finish.send(Unit)
+ }
+ }
+ .collect()
}
}
+ finish.receive()
+ scheduler.terminate()
println(simulationContext.clock.instant())
}