diff options
Diffstat (limited to 'simulator')
6 files changed, 75 insertions, 72 deletions
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 62808b4d..18255922 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -47,7 +47,7 @@ import kotlin.math.max * @param context The [CoroutineContext] to use. * @param clock The clock instance to keep track of time. */ -public class ComputeServiceImpl( +internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, private val tracer: EventTracer, @@ -104,11 +104,11 @@ public class ComputeServiceImpl( */ private val servers = mutableMapOf<UUID, InternalServer>() - public var submittedVms: Int = 0 - public var queuedVms: Int = 0 - public var runningVms: Int = 0 - public var finishedVms: Int = 0 - public var unscheduledVms: Int = 0 + private var submittedVms: Int = 0 + private var queuedVms: Int = 0 + private var runningVms: Int = 0 + private var finishedVms: Int = 0 + private var unscheduledVms: Int = 0 private var maxCores = 0 private var maxMemory = 0L diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 3c4b4410..2e4191cc 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -207,6 +207,8 @@ public class SimHost( _state = HostState.DOWN } + override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" + /** * Convert flavor to machine model. */ diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 44436019..1fdd45ac 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -22,15 +22,10 @@ package org.opendc.experiments.capelin -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.takeWhile -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService @@ -39,7 +34,6 @@ import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostEvent import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState -import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AllocationPolicy import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -54,6 +48,7 @@ import org.opendc.simulator.failures.FaultInjector import org.opendc.trace.core.EventTracer import java.io.File import java.time.Clock +import kotlin.coroutines.resume import kotlin.math.ln import kotlin.math.max import kotlin.random.Random @@ -142,7 +137,7 @@ public fun createComputeService( environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy, eventTracer: EventTracer -): ComputeServiceImpl { +): ComputeService { val hosts = environmentReader .use { it.read() } .map { def -> @@ -159,7 +154,7 @@ public fun createComputeService( } val scheduler = - ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl + ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) for (host in hosts) { scheduler.addHost(host) @@ -177,7 +172,8 @@ public fun attachMonitor( clock: Clock, scheduler: ComputeService, monitor: ExperimentMonitor -) { +): MonitorResults { + val results = MonitorResults() // Monitor host events for (host in scheduler.hosts) { monitor.reportHostStateChange(clock.millis(), host, HostState.UP) @@ -213,18 +209,33 @@ public fun attachMonitor( scheduler.events .onEach { event -> when (event) { - is ComputeServiceEvent.MetricsAvailable -> + is ComputeServiceEvent.MetricsAvailable -> { + results.submittedVms = event.totalVmCount + results.queuedVms = event.waitingVmCount + results.runningVms = event.activeVmCount + results.finishedVms = event.inactiveVmCount + results.unscheduledVms = event.failedVmCount monitor.reportProvisionerMetrics(clock.millis(), event) + } } } .launchIn(coroutineScope) + + return results +} + +public class MonitorResults { + public var submittedVms: Int = 0 + public var queuedVms: Int = 0 + public var runningVms: Int = 0 + public var finishedVms: Int = 0 + public var unscheduledVms: Int = 0 } /** * Process the trace. */ public suspend fun processTrace( - coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<SimWorkload>, scheduler: ComputeService, @@ -234,43 +245,40 @@ public suspend fun processTrace( val client = scheduler.newClient() val image = client.newImage("vm-image") try { - var submitted = 0 + coroutineScope { + while (reader.hasNext()) { + val entry = reader.next() - while (reader.hasNext()) { - val entry = reader.next() - - submitted++ - delay(max(0, entry.start - clock.millis())) - coroutineScope.launch { - chan.send(Unit) - val server = client.newServer( - entry.name, - image, - client.newFlavor( + delay(max(0, entry.start - clock.millis())) + launch { + chan.send(Unit) + val server = client.newServer( entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long - ), - meta = entry.meta - ) + image, + client.newFlavor( + entry.name, + entry.meta["cores"] as Int, + entry.meta["required-memory"] as Long + ), + meta = entry.meta + ) + + suspendCancellableCoroutine { cont -> + server.watch(object : ServerWatcher { + override fun onStateChanged(server: Server, newState: ServerState) { + monitor.reportVmStateChange(clock.millis(), server, newState) - server.watch(object : ServerWatcher { - override fun onStateChanged(server: Server, newState: ServerState) { - monitor.reportVmStateChange(clock.millis(), server, newState) + if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) { + cont.resume(Unit) + } + } + }) } - }) + } } } - scheduler.events - .takeWhile { - when (it) { - is ComputeServiceEvent.MetricsAvailable -> - it.inactiveVmCount + it.failedVmCount != submitted - } - } - .collect() - delay(1) + yield() } finally { reader.close() client.close() diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 66f07d97..5a7eadad 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -169,9 +169,8 @@ public abstract class Portfolio(name: String) : Experiment(name) { null } - attachMonitor(this, clock, scheduler, monitor) + val monitorResults = attachMonitor(this, clock, scheduler, monitor) processTrace( - this, clock, trace, scheduler, @@ -179,11 +178,11 @@ public abstract class Portfolio(name: String) : Experiment(name) { monitor ) - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") + logger.debug("SUBMIT=${monitorResults.submittedVms}") + logger.debug("FAIL=${monitorResults.unscheduledVms}") + logger.debug("QUEUED=${monitorResults.queuedVms}") + logger.debug("RUNNING=${monitorResults.runningVms}") + logger.debug("FINISHED=${monitorResults.finishedVms}") failureDomain?.cancel() scheduler.close() diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index a812490a..c16f7003 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -33,8 +33,8 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -94,7 +94,8 @@ class CapelinIntegrationTest { val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: ComputeServiceImpl + lateinit var scheduler: ComputeService + lateinit var monitorResults: MonitorResults val tracer = EventTracer(clock) testScope.launch { @@ -120,9 +121,8 @@ class CapelinIntegrationTest { null } - attachMonitor(this, clock, scheduler, monitor) + monitorResults = attachMonitor(this, clock, scheduler, monitor) processTrace( - this, clock, traceReader, scheduler, @@ -130,7 +130,7 @@ class CapelinIntegrationTest { monitor ) - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}") failureDomain?.cancel() scheduler.close() @@ -141,8 +141,8 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, + { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") }, + { assertEquals(50, monitorResults.finishedVms, "All VMs should finish after a run") }, { assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, { assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, @@ -167,9 +167,8 @@ class CapelinIntegrationTest { allocationPolicy, tracer ) - attachMonitor(this, clock, scheduler, monitor) + val monitorResults = attachMonitor(this, clock, scheduler, monitor) processTrace( - this, clock, traceReader, scheduler, @@ -179,7 +178,7 @@ class CapelinIntegrationTest { yield() - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}") scheduler.close() monitor.close() diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 560319ee..9c92bbf8 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -268,9 +268,8 @@ public class RunnerCli : CliktCommand(name = "runner") { null } - attachMonitor(this, clock, scheduler, monitor) + val monitorResults = attachMonitor(this, clock, scheduler, monitor) processTrace( - this, clock, trace, scheduler, @@ -278,11 +277,7 @@ public class RunnerCli : CliktCommand(name = "runner") { monitor ) - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") + logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}" } failureDomain?.cancel() scheduler.close() |
