summaryrefslogtreecommitdiff
path: root/simulator/opendc-experiments/opendc-experiments-capelin/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src/main')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt72
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt171
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt3
4 files changed, 183 insertions, 67 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 4f48bba7..40f50235 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -22,24 +22,19 @@
package org.opendc.experiments.capelin
-import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.sdk.common.CompletableResultCode
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostEvent
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.AllocationPolicy
import org.opendc.compute.simulator.SimHost
+import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
@@ -138,7 +133,7 @@ public fun createTraceReader(
*/
public suspend fun withComputeService(
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy,
block: suspend CoroutineScope.(ComputeService) -> Unit
@@ -153,13 +148,15 @@ public suspend fun withComputeService(
def.meta,
coroutineContext,
clock,
+ meterProvider.get("opendc-compute-simulator"),
SimFairShareHypervisorProvider(),
def.powerModel
)
}
+ val schedulerMeter = meterProvider.get("opendc-compute")
val scheduler =
- ComputeService(coroutineContext, clock, meter, allocationPolicy)
+ ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy)
for (host in hosts) {
scheduler.addHost(host)
@@ -194,62 +191,13 @@ public suspend fun withMonitor(
monitor.reportHostStateChange(clock.millis(), host, newState)
}
})
-
- monitorJobs += host.events
- .onEach { event ->
- when (event) {
- is HostEvent.SliceFinished -> monitor.reportHostSlice(
- clock.millis(),
- event.requestedBurst,
- event.grantedBurst,
- event.overcommissionedBurst,
- event.interferedBurst,
- event.cpuUsage,
- event.cpuDemand,
- event.numberOfDeployedImages,
- event.driver
- )
- }
- }
- .launchIn(this)
-
- monitorJobs += (host as SimHost).machine.powerDraw
- .onEach { monitor.reportPowerConsumption(host, it) }
- .launchIn(this)
}
val reader = CoroutineMetricReader(
- this, listOf(metricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
-
- val submittedVms = metricsByName["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val queuedVms = metricsByName["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val unscheduledVms = metricsByName["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val runningVms = metricsByName["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val finishedVms = metricsByName["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val hosts = metricsByName["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val availableHosts = metricsByName["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
-
- monitor.reportProvisionerMetrics(
- clock.millis(),
- hosts,
- availableHosts,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- return CompletableResultCode.ofSuccess()
- }
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
- },
- exportInterval = 5 * 60 * 1000
+ this,
+ listOf(metricProducer),
+ ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }),
+ exportInterval = 5 * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */
)
try {
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 2921daba..5fa77161 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -126,8 +126,6 @@ public abstract class Portfolio(name: String) : Experiment(name) {
.setClock(clock.toOtelClock())
.build()
- val meter = meterProvider.get("opendc-compute")
-
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
workload.workloads.map { it.name }
@@ -153,7 +151,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
4096
)
- withComputeService(clock, meter, environment, allocationPolicy) { scheduler ->
+ withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
new file mode 100644
index 00000000..799de60f
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.monitor
+
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import org.opendc.compute.service.driver.Host
+import java.time.Clock
+
+/**
+ * A [MetricExporter] that exports the metrics to the [ExperimentMonitor].
+ */
+public class ExperimentMetricExporter(
+ private val monitor: ExperimentMonitor,
+ private val clock: Clock,
+ private val hosts: Map<String, Host>
+) : MetricExporter {
+ override fun export(metrics: Collection<MetricData>): CompletableResultCode {
+ val metricsByName = metrics.associateBy { it.name }
+ reportHostMetrics(metricsByName)
+ reportProvisionerMetrics(metricsByName)
+ return CompletableResultCode.ofSuccess()
+ }
+
+ private fun reportHostMetrics(metrics: Map<String, MetricData>) {
+ val hostMetrics = mutableMapOf<String, HostMetrics>()
+ hosts.mapValuesTo(hostMetrics) { HostMetrics() }
+
+ mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v ->
+ m.cpuDemand = v
+ }
+
+ mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v ->
+ m.cpuUsage = v
+ }
+
+ mapDoubleSummary(metrics["power.usage"], hostMetrics) { m, v ->
+ m.powerDraw = v
+ }
+
+ mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v ->
+ m.requestedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v ->
+ m.grantedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v ->
+ m.overcommissionedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v ->
+ m.interferedBurst = v.toLong()
+ }
+
+ mapLongSum(metrics["guests.active"], hostMetrics) { m, v ->
+ m.numberOfDeployedImages = v.toInt()
+ }
+
+ for ((id, hostMetric) in hostMetrics) {
+ val host = hosts.getValue(id)
+ monitor.reportHostSlice(
+ clock.millis(),
+ hostMetric.requestedBurst,
+ hostMetric.grantedBurst,
+ hostMetric.overcommissionedBurst,
+ hostMetric.interferedBurst,
+ hostMetric.cpuUsage,
+ hostMetric.cpuDemand,
+ hostMetric.numberOfDeployedImages,
+ host
+ )
+
+ monitor.reportPowerConsumption(host, hostMetric.powerDraw)
+ }
+ }
+
+ private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data?.doubleSummaryData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.sum)
+ }
+ }
+ }
+
+ private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data?.doubleSumData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.value)
+ }
+ }
+ }
+
+ private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) {
+ val points = data?.longSumData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.value)
+ }
+ }
+ }
+
+ private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) {
+ val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+
+ monitor.reportProvisionerMetrics(
+ clock.millis(),
+ hosts,
+ availableHosts,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ }
+
+ private class HostMetrics {
+ var requestedBurst: Long = 0
+ var grantedBurst: Long = 0
+ var overcommissionedBurst: Long = 0
+ var interferedBurst: Long = 0
+ var cpuUsage: Double = 0.0
+ var cpuDemand: Double = 0.0
+ var numberOfDeployedImages: Int = 0
+ var powerDraw: Double = 0.0
+ }
+
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index a57c8d78..5e75c890 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -26,12 +26,11 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
-import java.io.Closeable
/**
* A monitor watches the events of an experiment.
*/
-public interface ExperimentMonitor : Closeable {
+public interface ExperimentMonitor : AutoCloseable {
/**
* This method is invoked when the state of a VM changes.
*/