summaryrefslogtreecommitdiff
path: root/simulator
diff options
context:
space:
mode:
Diffstat (limited to 'simulator')
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt12
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt92
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt13
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt19
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt9
6 files changed, 75 insertions, 72 deletions
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 62808b4d..18255922 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -47,7 +47,7 @@ import kotlin.math.max
* @param context The [CoroutineContext] to use.
* @param clock The clock instance to keep track of time.
*/
-public class ComputeServiceImpl(
+internal class ComputeServiceImpl(
private val context: CoroutineContext,
private val clock: Clock,
private val tracer: EventTracer,
@@ -104,11 +104,11 @@ public class ComputeServiceImpl(
*/
private val servers = mutableMapOf<UUID, InternalServer>()
- public var submittedVms: Int = 0
- public var queuedVms: Int = 0
- public var runningVms: Int = 0
- public var finishedVms: Int = 0
- public var unscheduledVms: Int = 0
+ private var submittedVms: Int = 0
+ private var queuedVms: Int = 0
+ private var runningVms: Int = 0
+ private var finishedVms: Int = 0
+ private var unscheduledVms: Int = 0
private var maxCores = 0
private var maxMemory = 0L
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 3c4b4410..2e4191cc 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -207,6 +207,8 @@ public class SimHost(
_state = HostState.DOWN
}
+ override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
+
/**
* Convert flavor to machine model.
*/
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 44436019..1fdd45ac 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -22,15 +22,10 @@
package org.opendc.experiments.capelin
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.flow.takeWhile
-import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
@@ -39,7 +34,6 @@ import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostEvent
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
-import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.AllocationPolicy
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -54,6 +48,7 @@ import org.opendc.simulator.failures.FaultInjector
import org.opendc.trace.core.EventTracer
import java.io.File
import java.time.Clock
+import kotlin.coroutines.resume
import kotlin.math.ln
import kotlin.math.max
import kotlin.random.Random
@@ -142,7 +137,7 @@ public fun createComputeService(
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy,
eventTracer: EventTracer
-): ComputeServiceImpl {
+): ComputeService {
val hosts = environmentReader
.use { it.read() }
.map { def ->
@@ -159,7 +154,7 @@ public fun createComputeService(
}
val scheduler =
- ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl
+ ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy)
for (host in hosts) {
scheduler.addHost(host)
@@ -177,7 +172,8 @@ public fun attachMonitor(
clock: Clock,
scheduler: ComputeService,
monitor: ExperimentMonitor
-) {
+): MonitorResults {
+ val results = MonitorResults()
// Monitor host events
for (host in scheduler.hosts) {
monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
@@ -213,18 +209,33 @@ public fun attachMonitor(
scheduler.events
.onEach { event ->
when (event) {
- is ComputeServiceEvent.MetricsAvailable ->
+ is ComputeServiceEvent.MetricsAvailable -> {
+ results.submittedVms = event.totalVmCount
+ results.queuedVms = event.waitingVmCount
+ results.runningVms = event.activeVmCount
+ results.finishedVms = event.inactiveVmCount
+ results.unscheduledVms = event.failedVmCount
monitor.reportProvisionerMetrics(clock.millis(), event)
+ }
}
}
.launchIn(coroutineScope)
+
+ return results
+}
+
+public class MonitorResults {
+ public var submittedVms: Int = 0
+ public var queuedVms: Int = 0
+ public var runningVms: Int = 0
+ public var finishedVms: Int = 0
+ public var unscheduledVms: Int = 0
}
/**
* Process the trace.
*/
public suspend fun processTrace(
- coroutineScope: CoroutineScope,
clock: Clock,
reader: TraceReader<SimWorkload>,
scheduler: ComputeService,
@@ -234,43 +245,40 @@ public suspend fun processTrace(
val client = scheduler.newClient()
val image = client.newImage("vm-image")
try {
- var submitted = 0
+ coroutineScope {
+ while (reader.hasNext()) {
+ val entry = reader.next()
- while (reader.hasNext()) {
- val entry = reader.next()
-
- submitted++
- delay(max(0, entry.start - clock.millis()))
- coroutineScope.launch {
- chan.send(Unit)
- val server = client.newServer(
- entry.name,
- image,
- client.newFlavor(
+ delay(max(0, entry.start - clock.millis()))
+ launch {
+ chan.send(Unit)
+ val server = client.newServer(
entry.name,
- entry.meta["cores"] as Int,
- entry.meta["required-memory"] as Long
- ),
- meta = entry.meta
- )
+ image,
+ client.newFlavor(
+ entry.name,
+ entry.meta["cores"] as Int,
+ entry.meta["required-memory"] as Long
+ ),
+ meta = entry.meta
+ )
+
+ suspendCancellableCoroutine { cont ->
+ server.watch(object : ServerWatcher {
+ override fun onStateChanged(server: Server, newState: ServerState) {
+ monitor.reportVmStateChange(clock.millis(), server, newState)
- server.watch(object : ServerWatcher {
- override fun onStateChanged(server: Server, newState: ServerState) {
- monitor.reportVmStateChange(clock.millis(), server, newState)
+ if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) {
+ cont.resume(Unit)
+ }
+ }
+ })
}
- })
+ }
}
}
- scheduler.events
- .takeWhile {
- when (it) {
- is ComputeServiceEvent.MetricsAvailable ->
- it.inactiveVmCount + it.failedVmCount != submitted
- }
- }
- .collect()
- delay(1)
+ yield()
} finally {
reader.close()
client.close()
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 66f07d97..5a7eadad 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -169,9 +169,8 @@ public abstract class Portfolio(name: String) : Experiment(name) {
null
}
- attachMonitor(this, clock, scheduler, monitor)
+ val monitorResults = attachMonitor(this, clock, scheduler, monitor)
processTrace(
- this,
clock,
trace,
scheduler,
@@ -179,11 +178,11 @@ public abstract class Portfolio(name: String) : Experiment(name) {
monitor
)
- logger.debug("SUBMIT=${scheduler.submittedVms}")
- logger.debug("FAIL=${scheduler.unscheduledVms}")
- logger.debug("QUEUED=${scheduler.queuedVms}")
- logger.debug("RUNNING=${scheduler.runningVms}")
- logger.debug("FINISHED=${scheduler.finishedVms}")
+ logger.debug("SUBMIT=${monitorResults.submittedVms}")
+ logger.debug("FAIL=${monitorResults.unscheduledVms}")
+ logger.debug("QUEUED=${monitorResults.queuedVms}")
+ logger.debug("RUNNING=${monitorResults.runningVms}")
+ logger.debug("FINISHED=${monitorResults.finishedVms}")
failureDomain?.cancel()
scheduler.close()
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index a812490a..c16f7003 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -33,8 +33,8 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -94,7 +94,8 @@ class CapelinIntegrationTest {
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- lateinit var scheduler: ComputeServiceImpl
+ lateinit var scheduler: ComputeService
+ lateinit var monitorResults: MonitorResults
val tracer = EventTracer(clock)
testScope.launch {
@@ -120,9 +121,8 @@ class CapelinIntegrationTest {
null
}
- attachMonitor(this, clock, scheduler, monitor)
+ monitorResults = attachMonitor(this, clock, scheduler, monitor)
processTrace(
- this,
clock,
traceReader,
scheduler,
@@ -130,7 +130,7 @@ class CapelinIntegrationTest {
monitor
)
- println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+ println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}")
failureDomain?.cancel()
scheduler.close()
@@ -141,8 +141,8 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
- { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
+ { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") },
+ { assertEquals(50, monitorResults.finishedVms, "All VMs should finish after a run") },
{ assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
{ assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
{ assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
@@ -167,9 +167,8 @@ class CapelinIntegrationTest {
allocationPolicy,
tracer
)
- attachMonitor(this, clock, scheduler, monitor)
+ val monitorResults = attachMonitor(this, clock, scheduler, monitor)
processTrace(
- this,
clock,
traceReader,
scheduler,
@@ -179,7 +178,7 @@ class CapelinIntegrationTest {
yield()
- println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+ println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}")
scheduler.close()
monitor.close()
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
index 560319ee..9c92bbf8 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -268,9 +268,8 @@ public class RunnerCli : CliktCommand(name = "runner") {
null
}
- attachMonitor(this, clock, scheduler, monitor)
+ val monitorResults = attachMonitor(this, clock, scheduler, monitor)
processTrace(
- this,
clock,
trace,
scheduler,
@@ -278,11 +277,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
monitor
)
- logger.debug("SUBMIT=${scheduler.submittedVms}")
- logger.debug("FAIL=${scheduler.unscheduledVms}")
- logger.debug("QUEUED=${scheduler.queuedVms}")
- logger.debug("RUNNING=${scheduler.runningVms}")
- logger.debug("FINISHED=${scheduler.finishedVms}")
+ logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}" }
failureDomain?.cancel()
scheduler.close()