diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-21 22:04:31 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-25 10:51:27 +0100 |
| commit | 76bfeb44c5a02be143c152c52bc1029cff360744 (patch) | |
| tree | be467a0be698df2ebb4dd9fd3c5410d1e53ffa46 /opendc/opendc-experiments-sc20 | |
| parent | bc64182612ad06f15bff5b48637ed7d241e293b2 (diff) | |
refactor: Migrate to Flow for event listeners
Diffstat (limited to 'opendc/opendc-experiments-sc20')
2 files changed, 12 insertions, 9 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 0f4d0c1b..e18bbe30 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -2,9 +2,7 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.driver.BareMetalDriver -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import kotlinx.coroutines.flow.first import java.io.BufferedWriter import java.io.Closeable @@ -12,7 +10,7 @@ import java.io.FileWriter class Sc20Monitor( destination: String -) : HypervisorMonitor, ServerMonitor, Closeable { +) : Closeable { private val outputFile = BufferedWriter(FileWriter(destination)) private var failed: Int = 0 @@ -20,14 +18,14 @@ class Sc20Monitor( outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n") } - override fun stateChanged(server: Server, previousState: ServerState) { + fun stateChanged(server: Server) { println("${server.uid} ${server.state}") if (server.state == ServerState.ERROR) { failed++ } } - override suspend fun onSliceFinish( + suspend fun onSliceFinish( time: Long, requestedBurst: Long, grantedBurst: Long, diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 4273c39e..96033ea7 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -27,6 +27,7 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy @@ -41,6 +42,8 @@ import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File @@ -107,11 +110,10 @@ fun main(args: Array<String>) { println(simulationContext.clock.instant()) val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] - val scheduler = SimpleVirtProvisioningService( AvailableMemoryAllocationPolicy(), simulationContext, - bareMetalProvisioner, + bareMetalProvisioner ) val faultInjectorDomain = root.newDomain(name = "failures") @@ -131,8 +133,11 @@ fun main(args: Array<String>) { while (reader.hasNext()) { val (time, workload) = reader.next() delay(max(0, time - simulationContext.clock.millis())) - chan.send(Unit) - scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory)) + launch { + chan.send(Unit) + val server = scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory)) + server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect() + } } println(simulationContext.clock.instant()) |
