diff options
Diffstat (limited to 'opendc-web/opendc-web-runner')
| -rw-r--r-- | opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt | 72 | ||||
| -rw-r--r-- | opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt | 40 |
2 files changed, 48 insertions, 64 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index b565e90d..b9d5a3f5 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -28,10 +28,8 @@ import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import mu.KotlinLogging -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.env.MachineDef @@ -39,6 +37,8 @@ import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator +import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -46,8 +46,9 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock import org.opendc.web.client.ApiClient import org.opendc.web.client.AuthConfiguration @@ -55,9 +56,8 @@ import org.opendc.web.client.model.Scenario import org.opendc.web.client.model.Topology import java.io.File import java.net.URI +import java.time.Duration import java.util.* -import kotlin.random.Random -import kotlin.random.asJavaRandom import org.opendc.web.client.model.Portfolio as ClientPortfolio private val logger = KotlinLogging.logger {} @@ -158,7 +158,7 @@ class RunnerCli : CliktCommand(name = "runner") { val results = (0 until targets.repeatsPerScenario).map { repeat -> logger.info { "Starting repeat $repeat" } withTimeout(runTimeout * 1000) { - val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong()).asJavaRandom()) } + val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) } runRepeat(scenario, repeat, environment, traceReader, interferenceModel) } } @@ -182,63 +182,55 @@ class RunnerCli : CliktCommand(name = "runner") { try { runBlockingSimulation { - val seed = repeat val workloadName = scenario.trace.traceId val workloadFraction = scenario.trace.loadSamplingFraction - val seeder = Random(seed) + val seeder = Random(repeat.toLong()) val meterProvider: MeterProvider = SdkMeterProvider .builder() .setClock(clock.toOtelClock()) .build() - val metricProducer = meterProvider as MetricProducer val operational = scenario.operationalPhenomena - val allocationPolicy = createComputeScheduler(operational.schedulerName, seeder) + val computeScheduler = createComputeScheduler(operational.schedulerName, seeder) val trace = ParquetTraceReader( listOf(traceReader), Workload(workloadName, workloadFraction), - seed + repeat ) - val failureFrequency = if (operational.failuresEnabled) 24.0 * 7 else 0.0 - - withComputeService(clock, meterProvider, environment, allocationPolicy, interferenceModel) { scheduler -> - val faultInjector = if (failureFrequency > 0) { - logger.debug { "ENABLING failures" } - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seeder.nextInt(), - failureFrequency, - ) - } else { + val failureModel = + if (operational.failuresEnabled) + grid5000(Duration.ofDays(7), repeat) + else null - } - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector?.start() + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environment.read(), + failureModel, + interferenceModel.takeIf { operational.performanceInterferenceEnabled } + ) - processTrace( - clock, - trace, - scheduler, - monitor - ) + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) - faultInjector?.close() - } + try { + simulator.run(trace) + } finally { + simulator.close() + metricReader.close() } - val monitorResults = collectServiceMetrics(clock.millis(), metricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) logger.debug { "Finish " + - "SUBMIT=${monitorResults.instanceCount} " + - "FAIL=${monitorResults.failedInstanceCount} " + - "QUEUE=${monitorResults.queuedInstanceCount} " + - "RUNNING=${monitorResults.runningInstanceCount}" + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" } } } catch (cause: Throwable) { diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt index c8e58dde..4b813310 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt @@ -22,27 +22,19 @@ package org.opendc.web.runner -import mu.KotlinLogging -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostData import org.opendc.telemetry.compute.table.ServiceData import kotlin.math.max +import kotlin.math.roundToLong /** * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. */ -public class WebComputeMonitor : ComputeMonitor { - private val logger = KotlinLogging.logger {} - - override fun onStateChange(time: Long, host: Host, newState: HostState) { - logger.debug { "Host ${host.uid} changed state $newState [$time]" } - } - +class WebComputeMonitor : ComputeMonitor { override fun record(data: HostData) { - val duration = 5 * 60 * 1000L - val slices = duration / SLICE_LENGTH + val duration = data.uptime + val slices = data.downtime / SLICE_LENGTH hostAggregateMetrics = AggregateHostMetrics( hostAggregateMetrics.totalWork + data.totalWork, @@ -50,14 +42,14 @@ public class WebComputeMonitor : ComputeMonitor { hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork, hostAggregateMetrics.totalInterferedWork + data.overcommittedWork, hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600, - hostAggregateMetrics.totalFailureSlices + if (data.host.state != HostState.UP) slices else 0, - hostAggregateMetrics.totalFailureVmSlices + if (data.host.state != HostState.UP) data.instanceCount * slices else 0 + hostAggregateMetrics.totalFailureSlices + slices, + hostAggregateMetrics.totalFailureVmSlices + data.instanceCount * slices ) - hostMetrics.compute(data.host) { _, prev -> + hostMetrics.compute(data.host.id) { _, prev -> HostMetrics( - (data.cpuUsage.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0), - (data.cpuDemand.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + data.cpuUsage + (prev?.cpuUsage ?: 0.0), + data.cpuDemand + (prev?.cpuDemand ?: 0.0), data.instanceCount + (prev?.instanceCount ?: 0), 1 + (prev?.count ?: 0) ) @@ -65,7 +57,7 @@ public class WebComputeMonitor : ComputeMonitor { } private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() - private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf() + private val hostMetrics: MutableMap<String, HostMetrics> = mutableMapOf() private val SLICE_LENGTH: Long = 5 * 60 * 1000 data class AggregateHostMetrics( @@ -74,8 +66,8 @@ public class WebComputeMonitor : ComputeMonitor { val totalOvercommittedWork: Double = 0.0, val totalInterferedWork: Double = 0.0, val totalPowerDraw: Double = 0.0, - val totalFailureSlices: Long = 0, - val totalFailureVmSlices: Long = 0, + val totalFailureSlices: Double = 0.0, + val totalFailureVmSlices: Double = 0.0, ) data class HostMetrics( @@ -97,7 +89,7 @@ public class WebComputeMonitor : ComputeMonitor { ) } - public data class AggregateServiceMetrics( + data class AggregateServiceMetrics( val vmTotalCount: Int = 0, val vmWaitingCount: Int = 0, val vmActiveCount: Int = 0, @@ -105,7 +97,7 @@ public class WebComputeMonitor : ComputeMonitor { val vmFailedCount: Int = 0 ) - public fun getResult(): Result { + fun getResult(): Result { return Result( hostAggregateMetrics.totalWork, hostAggregateMetrics.totalGrantedWork, @@ -116,8 +108,8 @@ public class WebComputeMonitor : ComputeMonitor { hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, hostAggregateMetrics.totalPowerDraw, - hostAggregateMetrics.totalFailureSlices, - hostAggregateMetrics.totalFailureVmSlices, + hostAggregateMetrics.totalFailureSlices.roundToLong(), + hostAggregateMetrics.totalFailureVmSlices.roundToLong(), serviceMetrics.vmTotalCount, serviceMetrics.vmWaitingCount, serviceMetrics.vmInactiveCount, |
