diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-05 22:35:24 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-06 11:42:20 +0200 |
| commit | 48f6a6f2d42851bc2eeed5b6ef41145740c70286 (patch) | |
| tree | c42f2a5b08923e91b0afad4bc686b298c165a1ea /opendc/opendc-experiments-sc20/src/main | |
| parent | 2b9b1e9e030dccacf9aa549fc49b2e5e382750bf (diff) | |
test: Add initial integration test for SC20 experiments
Diffstat (limited to 'opendc/opendc-experiments-sc20/src/main')
| -rw-r--r-- | opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt) | 90 |
1 files changed, 45 insertions, 45 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt index 6f1e9aae..fc4b9058 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt @@ -58,7 +58,6 @@ import com.xenomachina.argparser.default import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn @@ -220,56 +219,57 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc2 /** * Process the trace. */ -suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, vmPlacements: Map<String, String>, monitor: Sc20Monitor) { +suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: Sc20Monitor, vmPlacements: Map<String, String> = emptyMap()) { + val domain = simulationContext.domain + try { - coroutineScope { - var submitted = 0L - val finished = Channel<Unit>(Channel.CONFLATED) - val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) - - while (reader.hasNext()) { - val (time, workload) = reader.next() - - if (vmPlacements.isNotEmpty()) { - val vmId = workload.name.replace("VM Workload ", "") - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null) { - println("Could not find placement data in VM placement file for VM $vmId") - continue - } - val machineInCluster = hypervisors.ceiling(clusterName)?.let { it.contains(clusterName) } ?: false - if (machineInCluster) { - println("Ignored VM") - continue - } + var submitted = 0L + val finished = Channel<Unit>(Channel.CONFLATED) + val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) + + while (reader.hasNext()) { + val (time, workload) = reader.next() + + if (vmPlacements.isNotEmpty()) { + val vmId = workload.name.replace("VM Workload ", "") + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null) { + println("Could not find placement data in VM placement file for VM $vmId") + continue } - - submitted++ - delay(max(0, time - simulationContext.clock.millis())) - launch { - chan.send(Unit) - val server = scheduler.deploy( - workload.image.name, workload.image, - Flavor(workload.image.maxCores, workload.image.requiredMemory) - ) - // Monitor server events - server.events - .onEach { - if (it is ServerEvent.StateChanged) { - monitor.onVmStateChanged(it.server) - } - - finished.send(Unit) - } - .collect() + val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false + if (machineInCluster) { + println("Ignored VM") + continue } } - while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) { - finished.receive() + submitted++ + delay(max(0, time - simulationContext.clock.millis())) + domain.launch { + chan.send(Unit) + val server = scheduler.deploy( + workload.image.name, workload.image, + Flavor(workload.image.maxCores, workload.image.requiredMemory) + ) + // Monitor server events + server.events + .onEach { + if (it is ServerEvent.StateChanged) { + monitor.onVmStateChanged(it.server) + } + + delay(1) + finished.send(Unit) + } + .collect() } } + + while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) { + finished.receive() + } } finally { reader.close() } @@ -347,7 +347,7 @@ fun main(args: Array<String>) { } attachMonitor(scheduler, monitor) - processTrace(traceReader, scheduler, chan, vmPlacements, monitor) + processTrace(traceReader, scheduler, chan, monitor, vmPlacements) println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") |
