summaryrefslogtreecommitdiff
path: root/simulator/opendc-runner-web
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-runner-web')
-rw-r--r--simulator/opendc-runner-web/build.gradle.kts2
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt172
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt22
3 files changed, 98 insertions, 98 deletions
diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts
index d07fe7a6..fcc78a83 100644
--- a/simulator/opendc-runner-web/build.gradle.kts
+++ b/simulator/opendc-runner-web/build.gradle.kts
@@ -48,4 +48,6 @@ dependencies {
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}")
runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}")
+
+ implementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
}
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
index b9aeecb8..5b717ff7 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -34,9 +34,12 @@ import com.mongodb.client.MongoClients
import com.mongodb.client.MongoCollection
import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.Filters
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
import mu.KotlinLogging
import org.bson.Document
import org.bson.types.ObjectId
@@ -45,19 +48,14 @@ import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy
import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy
import org.opendc.compute.service.scheduler.RandomAllocationPolicy
-import org.opendc.compute.simulator.allocation.*
-import org.opendc.experiments.capelin.attachMonitor
-import org.opendc.experiments.capelin.createComputeService
-import org.opendc.experiments.capelin.createFailureDomain
+import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.processTrace
import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import org.opendc.simulator.utils.DelayControllerClockAdapter
-import org.opendc.trace.core.EventTracer
+import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
-import kotlin.coroutines.coroutineContext
import kotlin.random.Random
private val logger = KotlinLogging.logger {}
@@ -208,93 +206,85 @@ public class RunnerCli : CliktCommand(name = "runner") {
traceReader: Sc20RawParquetTraceReader,
performanceInterferenceReader: Sc20PerformanceInterferenceReader?
): WebExperimentMonitor.Result {
- val seed = repeat
- val traceDocument = scenario.get("trace", Document::class.java)
- val workloadName = traceDocument.getString("traceId")
- val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
-
- val seeder = Random(seed)
- val testScope = TestCoroutineScope(Job(parent = coroutineContext[Job]))
- val clock = DelayControllerClockAdapter(testScope)
-
- val chan = Channel<Unit>(Channel.CONFLATED)
-
- val operational = scenario.get("operational", Document::class.java)
- val allocationPolicy =
- when (val policyName = operational.getString("schedulerName")) {
- "mem" -> AvailableMemoryAllocationPolicy()
- "mem-inv" -> AvailableMemoryAllocationPolicy(true)
- "core-mem" -> AvailableCoreMemoryAllocationPolicy()
- "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
- "active-servers" -> NumberOfActiveServersAllocationPolicy()
- "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
- "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
- "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
- "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
- else -> throw IllegalArgumentException("Unknown policy $policyName")
- }
-
- val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap()
- val trace = Sc20ParquetTraceReader(
- listOf(traceReader),
- performanceInterferenceModel,
- Workload(workloadName, workloadFraction),
- seed
- )
- val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
- val environment = TopologyParser(topologies, topologyId)
val monitor = WebExperimentMonitor()
- val tracer = EventTracer(clock)
-
- testScope.launch {
- val scheduler = createComputeService(
- this,
- clock,
- environment,
- allocationPolicy,
- tracer
- )
-
- val failureDomain = if (operational.getBoolean("failuresEnabled")) {
- logger.debug("ENABLING failures")
- createFailureDomain(
- testScope,
- clock,
- seeder.nextInt(),
- operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7,
- scheduler,
- chan
+
+ try {
+ runBlockingTest {
+ val seed = repeat
+ val traceDocument = scenario.get("trace", Document::class.java)
+ val workloadName = traceDocument.getString("traceId")
+ val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
+
+ val seeder = Random(seed)
+ val clock = DelayControllerClockAdapter(this)
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+ val metricProducer = meterProvider as MetricProducer
+
+ val operational = scenario.get("operational", Document::class.java)
+ val allocationPolicy =
+ when (val policyName = operational.getString("schedulerName")) {
+ "mem" -> AvailableMemoryAllocationPolicy()
+ "mem-inv" -> AvailableMemoryAllocationPolicy(true)
+ "core-mem" -> AvailableCoreMemoryAllocationPolicy()
+ "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
+ "active-servers" -> NumberOfActiveServersAllocationPolicy()
+ "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
+ "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
+ "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
+ "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
+ else -> throw IllegalArgumentException("Unknown policy $policyName")
+ }
+
+ val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap()
+ val trace = Sc20ParquetTraceReader(
+ listOf(traceReader),
+ performanceInterferenceModel,
+ Workload(workloadName, workloadFraction),
+ seed
)
- } else {
- null
- }
+ val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
+ val environment = TopologyParser(topologies, topologyId)
+ val failureFrequency = operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7
+
+ withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
+ val failureDomain = if (failureFrequency > 0) {
+ logger.debug { "ENABLING failures" }
+ createFailureDomain(
+ this,
+ clock,
+ seeder.nextInt(),
+ failureFrequency,
+ scheduler,
+ chan
+ )
+ } else {
+ null
+ }
- attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- this,
- clock,
- trace,
- scheduler,
- chan,
- monitor
- )
-
- logger.debug("SUBMIT=${scheduler.submittedVms}")
- logger.debug("FAIL=${scheduler.unscheduledVms}")
- logger.debug("QUEUED=${scheduler.queuedVms}")
- logger.debug("RUNNING=${scheduler.runningVms}")
- logger.debug("FINISHED=${scheduler.finishedVms}")
-
- failureDomain?.cancel()
- scheduler.close()
- }
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ trace,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
- try {
- testScope.advanceUntilIdle()
- testScope.uncaughtExceptions.forEach { it.printStackTrace() }
- } finally {
- monitor.close()
- testScope.cancel()
+ failureDomain?.cancel()
+ }
+
+ val monitorResults = collectMetrics(metricProducer)
+ logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
+ }
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Experiment failed" }
}
return monitor.getResult()
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
index a8ac6c10..fcd43ea7 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
@@ -25,7 +25,6 @@ package org.opendc.runner.web
import mu.KotlinLogging
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -205,13 +204,22 @@ public class WebExperimentMonitor : ExperimentMonitor {
private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics()
- override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {
+ override fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {
provisionerMetrics = AggregateProvisionerMetrics(
- max(event.totalVmCount, provisionerMetrics.vmTotalCount),
- max(event.waitingVmCount, provisionerMetrics.vmWaitingCount),
- max(event.activeVmCount, provisionerMetrics.vmActiveCount),
- max(event.inactiveVmCount, provisionerMetrics.vmInactiveCount),
- max(event.failedVmCount, provisionerMetrics.vmFailedCount),
+ max(totalVmCount, provisionerMetrics.vmTotalCount),
+ max(waitingVmCount, provisionerMetrics.vmWaitingCount),
+ max(activeVmCount, provisionerMetrics.vmActiveCount),
+ max(inactiveVmCount, provisionerMetrics.vmInactiveCount),
+ max(failedVmCount, provisionerMetrics.vmFailedCount),
)
}