summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-sc20
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-21 22:04:31 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-25 10:51:27 +0100
commit76bfeb44c5a02be143c152c52bc1029cff360744 (patch)
treebe467a0be698df2ebb4dd9fd3c5410d1e53ffa46 /opendc/opendc-experiments-sc20
parentbc64182612ad06f15bff5b48637ed7d241e293b2 (diff)
refactor: Migrate to Flow for event listeners
Diffstat (limited to 'opendc/opendc-experiments-sc20')
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt8
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt13
2 files changed, 12 insertions, 9 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index 0f4d0c1b..e18bbe30 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -2,9 +2,7 @@ package com.atlarge.opendc.experiments.sc20
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import kotlinx.coroutines.flow.first
import java.io.BufferedWriter
import java.io.Closeable
@@ -12,7 +10,7 @@ import java.io.FileWriter
class Sc20Monitor(
destination: String
-) : HypervisorMonitor, ServerMonitor, Closeable {
+) : Closeable {
private val outputFile = BufferedWriter(FileWriter(destination))
private var failed: Int = 0
@@ -20,14 +18,14 @@ class Sc20Monitor(
outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n")
}
- override fun stateChanged(server: Server, previousState: ServerState) {
+ fun stateChanged(server: Server) {
println("${server.uid} ${server.state}")
if (server.state == ServerState.ERROR) {
failed++
}
}
- override suspend fun onSliceFinish(
+ suspend fun onSliceFinish(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 4273c39e..96033ea7 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -27,6 +27,7 @@ package com.atlarge.opendc.experiments.sc20
import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
@@ -41,6 +42,8 @@ import com.xenomachina.argparser.ArgParser
import com.xenomachina.argparser.default
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.io.File
@@ -107,11 +110,10 @@ fun main(args: Array<String>) {
println(simulationContext.clock.instant())
val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key]
-
val scheduler = SimpleVirtProvisioningService(
AvailableMemoryAllocationPolicy(),
simulationContext,
- bareMetalProvisioner,
+ bareMetalProvisioner
)
val faultInjectorDomain = root.newDomain(name = "failures")
@@ -131,8 +133,11 @@ fun main(args: Array<String>) {
while (reader.hasNext()) {
val (time, workload) = reader.next()
delay(max(0, time - simulationContext.clock.millis()))
- chan.send(Unit)
- scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory))
+ launch {
+ chan.send(Unit)
+ val server = scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory))
+ server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect()
+ }
}
println(simulationContext.clock.instant())