summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-sc20/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-05 22:35:24 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-06 11:42:20 +0200
commit48f6a6f2d42851bc2eeed5b6ef41145740c70286 (patch)
treec42f2a5b08923e91b0afad4bc686b298c165a1ea /opendc/opendc-experiments-sc20/src/main
parent2b9b1e9e030dccacf9aa549fc49b2e5e382750bf (diff)
test: Add initial integration test for SC20 experiments
Diffstat (limited to 'opendc/opendc-experiments-sc20/src/main')
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt)90
1 files changed, 45 insertions, 45 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
index 6f1e9aae..fc4b9058 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
@@ -58,7 +58,6 @@ import com.xenomachina.argparser.default
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
@@ -220,56 +219,57 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc2
/**
* Process the trace.
*/
-suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, vmPlacements: Map<String, String>, monitor: Sc20Monitor) {
+suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: Sc20Monitor, vmPlacements: Map<String, String> = emptyMap()) {
+ val domain = simulationContext.domain
+
try {
- coroutineScope {
- var submitted = 0L
- val finished = Channel<Unit>(Channel.CONFLATED)
- val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name })
-
- while (reader.hasNext()) {
- val (time, workload) = reader.next()
-
- if (vmPlacements.isNotEmpty()) {
- val vmId = workload.name.replace("VM Workload ", "")
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null) {
- println("Could not find placement data in VM placement file for VM $vmId")
- continue
- }
- val machineInCluster = hypervisors.ceiling(clusterName)?.let { it.contains(clusterName) } ?: false
- if (machineInCluster) {
- println("Ignored VM")
- continue
- }
+ var submitted = 0L
+ val finished = Channel<Unit>(Channel.CONFLATED)
+ val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name })
+
+ while (reader.hasNext()) {
+ val (time, workload) = reader.next()
+
+ if (vmPlacements.isNotEmpty()) {
+ val vmId = workload.name.replace("VM Workload ", "")
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null) {
+ println("Could not find placement data in VM placement file for VM $vmId")
+ continue
}
-
- submitted++
- delay(max(0, time - simulationContext.clock.millis()))
- launch {
- chan.send(Unit)
- val server = scheduler.deploy(
- workload.image.name, workload.image,
- Flavor(workload.image.maxCores, workload.image.requiredMemory)
- )
- // Monitor server events
- server.events
- .onEach {
- if (it is ServerEvent.StateChanged) {
- monitor.onVmStateChanged(it.server)
- }
-
- finished.send(Unit)
- }
- .collect()
+ val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false
+ if (machineInCluster) {
+ println("Ignored VM")
+ continue
}
}
- while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) {
- finished.receive()
+ submitted++
+ delay(max(0, time - simulationContext.clock.millis()))
+ domain.launch {
+ chan.send(Unit)
+ val server = scheduler.deploy(
+ workload.image.name, workload.image,
+ Flavor(workload.image.maxCores, workload.image.requiredMemory)
+ )
+ // Monitor server events
+ server.events
+ .onEach {
+ if (it is ServerEvent.StateChanged) {
+ monitor.onVmStateChanged(it.server)
+ }
+
+ delay(1)
+ finished.send(Unit)
+ }
+ .collect()
}
}
+
+ while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) {
+ finished.receive()
+ }
} finally {
reader.close()
}
@@ -347,7 +347,7 @@ fun main(args: Array<String>) {
}
attachMonitor(scheduler, monitor)
- processTrace(traceReader, scheduler, chan, vmPlacements, monitor)
+ processTrace(traceReader, scheduler, chan, monitor, vmPlacements)
println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")