summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-runner/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-web/opendc-web-runner/src')
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt72
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt40
2 files changed, 48 insertions, 64 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index b565e90d..b9d5a3f5 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -28,10 +28,8 @@ import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.long
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import mu.KotlinLogging
-import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.env.MachineDef
@@ -39,6 +37,8 @@ import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
+import org.opendc.experiments.capelin.util.ComputeServiceSimulator
+import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -46,8 +46,9 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.LinearPowerModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.compute.withMonitor
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.web.client.ApiClient
import org.opendc.web.client.AuthConfiguration
@@ -55,9 +56,8 @@ import org.opendc.web.client.model.Scenario
import org.opendc.web.client.model.Topology
import java.io.File
import java.net.URI
+import java.time.Duration
import java.util.*
-import kotlin.random.Random
-import kotlin.random.asJavaRandom
import org.opendc.web.client.model.Portfolio as ClientPortfolio
private val logger = KotlinLogging.logger {}
@@ -158,7 +158,7 @@ class RunnerCli : CliktCommand(name = "runner") {
val results = (0 until targets.repeatsPerScenario).map { repeat ->
logger.info { "Starting repeat $repeat" }
withTimeout(runTimeout * 1000) {
- val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong()).asJavaRandom()) }
+ val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) }
runRepeat(scenario, repeat, environment, traceReader, interferenceModel)
}
}
@@ -182,63 +182,55 @@ class RunnerCli : CliktCommand(name = "runner") {
try {
runBlockingSimulation {
- val seed = repeat
val workloadName = scenario.trace.traceId
val workloadFraction = scenario.trace.loadSamplingFraction
- val seeder = Random(seed)
+ val seeder = Random(repeat.toLong())
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
.setClock(clock.toOtelClock())
.build()
- val metricProducer = meterProvider as MetricProducer
val operational = scenario.operationalPhenomena
- val allocationPolicy = createComputeScheduler(operational.schedulerName, seeder)
+ val computeScheduler = createComputeScheduler(operational.schedulerName, seeder)
val trace = ParquetTraceReader(
listOf(traceReader),
Workload(workloadName, workloadFraction),
- seed
+ repeat
)
- val failureFrequency = if (operational.failuresEnabled) 24.0 * 7 else 0.0
-
- withComputeService(clock, meterProvider, environment, allocationPolicy, interferenceModel) { scheduler ->
- val faultInjector = if (failureFrequency > 0) {
- logger.debug { "ENABLING failures" }
- createFaultInjector(
- coroutineContext,
- clock,
- scheduler.hosts.map { it as SimHost }.toSet(),
- seeder.nextInt(),
- failureFrequency,
- )
- } else {
+ val failureModel =
+ if (operational.failuresEnabled)
+ grid5000(Duration.ofDays(7), repeat)
+ else
null
- }
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- faultInjector?.start()
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environment.read(),
+ failureModel,
+ interferenceModel.takeIf { operational.performanceInterferenceEnabled }
+ )
- processTrace(
- clock,
- trace,
- scheduler,
- monitor
- )
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
- faultInjector?.close()
- }
+ try {
+ simulator.run(trace)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val monitorResults = collectServiceMetrics(clock.millis(), metricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
logger.debug {
"Finish " +
- "SUBMIT=${monitorResults.instanceCount} " +
- "FAIL=${monitorResults.failedInstanceCount} " +
- "QUEUE=${monitorResults.queuedInstanceCount} " +
- "RUNNING=${monitorResults.runningInstanceCount}"
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
}
}
} catch (cause: Throwable) {
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
index c8e58dde..4b813310 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
@@ -22,27 +22,19 @@
package org.opendc.web.runner
-import mu.KotlinLogging
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostData
import org.opendc.telemetry.compute.table.ServiceData
import kotlin.math.max
+import kotlin.math.roundToLong
/**
* A [ComputeMonitor] that tracks the aggregate metrics for each repeat.
*/
-public class WebComputeMonitor : ComputeMonitor {
- private val logger = KotlinLogging.logger {}
-
- override fun onStateChange(time: Long, host: Host, newState: HostState) {
- logger.debug { "Host ${host.uid} changed state $newState [$time]" }
- }
-
+class WebComputeMonitor : ComputeMonitor {
override fun record(data: HostData) {
- val duration = 5 * 60 * 1000L
- val slices = duration / SLICE_LENGTH
+ val duration = data.uptime
+ val slices = data.downtime / SLICE_LENGTH
hostAggregateMetrics = AggregateHostMetrics(
hostAggregateMetrics.totalWork + data.totalWork,
@@ -50,14 +42,14 @@ public class WebComputeMonitor : ComputeMonitor {
hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork,
hostAggregateMetrics.totalInterferedWork + data.overcommittedWork,
hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600,
- hostAggregateMetrics.totalFailureSlices + if (data.host.state != HostState.UP) slices else 0,
- hostAggregateMetrics.totalFailureVmSlices + if (data.host.state != HostState.UP) data.instanceCount * slices else 0
+ hostAggregateMetrics.totalFailureSlices + slices,
+ hostAggregateMetrics.totalFailureVmSlices + data.instanceCount * slices
)
- hostMetrics.compute(data.host) { _, prev ->
+ hostMetrics.compute(data.host.id) { _, prev ->
HostMetrics(
- (data.cpuUsage.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
- (data.cpuDemand.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
+ data.cpuUsage + (prev?.cpuUsage ?: 0.0),
+ data.cpuDemand + (prev?.cpuDemand ?: 0.0),
data.instanceCount + (prev?.instanceCount ?: 0),
1 + (prev?.count ?: 0)
)
@@ -65,7 +57,7 @@ public class WebComputeMonitor : ComputeMonitor {
}
private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
- private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf()
+ private val hostMetrics: MutableMap<String, HostMetrics> = mutableMapOf()
private val SLICE_LENGTH: Long = 5 * 60 * 1000
data class AggregateHostMetrics(
@@ -74,8 +66,8 @@ public class WebComputeMonitor : ComputeMonitor {
val totalOvercommittedWork: Double = 0.0,
val totalInterferedWork: Double = 0.0,
val totalPowerDraw: Double = 0.0,
- val totalFailureSlices: Long = 0,
- val totalFailureVmSlices: Long = 0,
+ val totalFailureSlices: Double = 0.0,
+ val totalFailureVmSlices: Double = 0.0,
)
data class HostMetrics(
@@ -97,7 +89,7 @@ public class WebComputeMonitor : ComputeMonitor {
)
}
- public data class AggregateServiceMetrics(
+ data class AggregateServiceMetrics(
val vmTotalCount: Int = 0,
val vmWaitingCount: Int = 0,
val vmActiveCount: Int = 0,
@@ -105,7 +97,7 @@ public class WebComputeMonitor : ComputeMonitor {
val vmFailedCount: Int = 0
)
- public fun getResult(): Result {
+ fun getResult(): Result {
return Result(
hostAggregateMetrics.totalWork,
hostAggregateMetrics.totalGrantedWork,
@@ -116,8 +108,8 @@ public class WebComputeMonitor : ComputeMonitor {
hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(),
hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0,
hostAggregateMetrics.totalPowerDraw,
- hostAggregateMetrics.totalFailureSlices,
- hostAggregateMetrics.totalFailureVmSlices,
+ hostAggregateMetrics.totalFailureSlices.roundToLong(),
+ hostAggregateMetrics.totalFailureVmSlices.roundToLong(),
serviceMetrics.vmTotalCount,
serviceMetrics.vmWaitingCount,
serviceMetrics.vmInactiveCount,