summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-25 20:33:52 +0200
committerGitHub <noreply@github.com>2021-08-25 20:33:52 +0200
commit4f333808d823abadd603ef2221092d82dc0f02b4 (patch)
tree8e9b054a770b3a048b5cfb44e3f6bb4dff57315e
parentac48fa12f36180de31154a7c828b4dc281dac94b (diff)
parente5b79b18dab4f2874f3c5730b7e599dc74573c8d (diff)
merge: Upgrade to OpenTelemetry 1.5
This pull request updates to OpenTelemetry version 1.5.0. * Update dependency to OpenTelemetry 1.5 * Fix breaking changes in metrics API * Eliminate unnecessary double to long conversions * Simplify metric extraction for monitor
-rw-r--r--gradle/libs.versions.toml7
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt14
-rw-r--r--opendc-compute/opendc-compute-simulator/build.gradle.kts1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt89
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt19
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt51
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt134
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt19
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt24
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt60
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt16
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt44
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt40
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt24
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt12
20 files changed, 311 insertions, 278 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index a15fa45d..4f2ec58c 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -3,8 +3,9 @@ junit-jupiter = "5.7.2"
junit-platform = "1.7.2"
slf4j = "1.7.32"
log4j = "2.14.1"
-opentelemetry-main = "1.4.1"
-opentelemetry-metrics = "1.4.1-alpha"
+opentelemetry-main = "1.5.0"
+opentelemetry-metrics = "1.5.0-alpha"
+opentelemetry-semconv = "1.5.0-alpha"
hadoop = "3.3.0"
ktor = "1.6.2"
jackson = "2.12.4"
@@ -23,6 +24,8 @@ opentelemetry-api-main = { module = "io.opentelemetry:opentelemetry-api", versio
opentelemetry-sdk-main = { module = "io.opentelemetry:opentelemetry-sdk", version.ref = "opentelemetry-main" }
opentelemetry-api-metrics = { module = "io.opentelemetry:opentelemetry-api-metrics", version.ref = "opentelemetry-metrics" }
opentelemetry-sdk-metrics = { module = "io.opentelemetry:opentelemetry-sdk-metrics", version.ref = "opentelemetry-metrics" }
+opentelemetry-semconv = { module = "io.opentelemetry:opentelemetry-semconv", version.ref = "opentelemetry-semconv" }
+
# Testing
junit-jupiter-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit-jupiter" }
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index e7807177..d7a7e8f8 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -106,7 +106,7 @@ internal class ComputeServiceImpl(
/**
* The number of servers that have been submitted to the service for provisioning.
*/
- private val _submittedServers = meter.longCounterBuilder("servers.submitted")
+ private val _submittedServers = meter.counterBuilder("servers.submitted")
.setDescription("Number of start requests")
.setUnit("1")
.build()
@@ -114,7 +114,7 @@ internal class ComputeServiceImpl(
/**
* The number of servers that failed to be scheduled.
*/
- private val _unscheduledServers = meter.longCounterBuilder("servers.unscheduled")
+ private val _unscheduledServers = meter.counterBuilder("servers.unscheduled")
.setDescription("Number of unscheduled servers")
.setUnit("1")
.build()
@@ -122,7 +122,7 @@ internal class ComputeServiceImpl(
/**
* The number of servers that are waiting to be provisioned.
*/
- private val _waitingServers = meter.longUpDownCounterBuilder("servers.waiting")
+ private val _waitingServers = meter.upDownCounterBuilder("servers.waiting")
.setDescription("Number of servers waiting to be provisioned")
.setUnit("1")
.build()
@@ -130,7 +130,7 @@ internal class ComputeServiceImpl(
/**
* The number of servers that are waiting to be provisioned.
*/
- private val _runningServers = meter.longUpDownCounterBuilder("servers.active")
+ private val _runningServers = meter.upDownCounterBuilder("servers.active")
.setDescription("Number of servers currently running")
.setUnit("1")
.build()
@@ -138,7 +138,7 @@ internal class ComputeServiceImpl(
/**
* The number of servers that have finished running.
*/
- private val _finishedServers = meter.longCounterBuilder("servers.finished")
+ private val _finishedServers = meter.counterBuilder("servers.finished")
.setDescription("Number of servers that finished running")
.setUnit("1")
.build()
@@ -146,7 +146,7 @@ internal class ComputeServiceImpl(
/**
* The number of hosts registered at the compute service.
*/
- private val _hostCount = meter.longUpDownCounterBuilder("hosts.total")
+ private val _hostCount = meter.upDownCounterBuilder("hosts.total")
.setDescription("Number of hosts")
.setUnit("1")
.build()
@@ -154,7 +154,7 @@ internal class ComputeServiceImpl(
/**
* The number of available hosts registered at the compute service.
*/
- private val _availableHostCount = meter.longUpDownCounterBuilder("hosts.available")
+ private val _availableHostCount = meter.upDownCounterBuilder("hosts.available")
.setDescription("Number of available hosts")
.setUnit("1")
.build()
diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts
index b31a2114..c5a9e668 100644
--- a/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -35,6 +35,7 @@ dependencies {
api(projects.opendcSimulator.opendcSimulatorCompute)
api(projects.opendcSimulator.opendcSimulatorFailures)
implementation(projects.opendcUtils)
+ implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index be771f6d..dcc525cb 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -22,8 +22,9 @@
package org.opendc.compute.simulator
+import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.api.metrics.common.Labels
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.Flavor
@@ -100,22 +101,20 @@ public class SimHost(
listener = object : SimHypervisor.Listener {
override fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
+ requestedWork: Double,
+ grantedWork: Double,
+ overcommittedWork: Double,
+ interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double
) {
-
- _batch.put(_cpuWork, requestedWork.toDouble())
- _batch.put(_cpuWorkGranted, grantedWork.toDouble())
- _batch.put(_cpuWorkOvercommit, overcommittedWork.toDouble())
- _batch.put(_cpuWorkInterference, interferedWork.toDouble())
- _batch.put(_cpuUsage, cpuUsage)
- _batch.put(_cpuDemand, cpuDemand)
- _batch.put(_cpuPower, machine.psu.powerDraw)
- _batch.record()
+ _totalWork.add(requestedWork)
+ _grantedWork.add(grantedWork)
+ _overcommittedWork.add(overcommittedWork)
+ _interferedWork.add(interferedWork)
+ _cpuDemand.record(cpuDemand)
+ _cpuUsage.record(cpuUsage)
+ _powerUsage.record(machine.psu.powerDraw)
}
}
)
@@ -135,86 +134,92 @@ public class SimHost(
field = value
}
- override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum())
+ override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size })
/**
- * The number of guests on the host.
+ * The total number of guests.
*/
- private val _guests = meter.longUpDownCounterBuilder("guests.total")
+ private val _guests = meter.upDownCounterBuilder("guests.total")
.setDescription("Number of guests")
.setUnit("1")
.build()
- .bind(Labels.of("host", uid.toString()))
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
* The number of active guests on the host.
*/
- private val _activeGuests = meter.longUpDownCounterBuilder("guests.active")
+ private val _activeGuests = meter.upDownCounterBuilder("guests.active")
.setDescription("Number of active guests")
.setUnit("1")
.build()
- .bind(Labels.of("host", uid.toString()))
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
- * The CPU usage on the host.
+ * The CPU demand of the host.
*/
- private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage")
- .setDescription("The amount of CPU resources used by the host")
+ private val _cpuDemand = meter.histogramBuilder("cpu.demand")
+ .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
.setUnit("MHz")
.build()
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
- * The CPU demand on the host.
+ * The CPU usage of the host.
*/
- private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand")
- .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
+ private val _cpuUsage = meter.histogramBuilder("cpu.usage")
+ .setDescription("The amount of CPU resources used by the host")
.setUnit("MHz")
.build()
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
- * The requested work for the CPU.
+ * The power usage of the host.
*/
- private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage")
+ private val _powerUsage = meter.histogramBuilder("power.usage")
.setDescription("The amount of power used by the CPU")
.setUnit("W")
.build()
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
- * The requested work for the CPU.
+ * The total amount of work supplied to the CPU.
*/
- private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total")
+ private val _totalWork = meter.counterBuilder("cpu.work.total")
.setDescription("The amount of work supplied to the CPU")
.setUnit("1")
+ .ofDoubles()
.build()
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
- * The work actually performed by the CPU.
+ * The work performed by the CPU.
*/
- private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted")
+ private val _grantedWork = meter.counterBuilder("cpu.work.granted")
.setDescription("The amount of work performed by the CPU")
.setUnit("1")
+ .ofDoubles()
.build()
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
- * The work that could not be performed by the CPU due to overcommitting resource.
+ * The amount not performed by the CPU due to overcommitment.
*/
- private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit")
+ private val _overcommittedWork = meter.counterBuilder("cpu.work.overcommit")
.setDescription("The amount of work not performed by the CPU due to overcommitment")
.setUnit("1")
+ .ofDoubles()
.build()
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
/**
- * The work that could not be performed by the CPU due to interference.
+ * The amount of work not performed by the CPU due to interference.
*/
- private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference")
+ private val _interferedWork = meter.counterBuilder("cpu.work.interference")
.setDescription("The amount of work not performed by the CPU due to interference")
.setUnit("1")
+ .ofDoubles()
.build()
-
- /**
- * The batch recorder used to record multiple metrics atomically.
- */
- private val _batch = meter.newBatchRecorder("host", uid.toString())
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
init {
// Launch hypervisor onto machine
@@ -273,8 +278,8 @@ public class SimHost(
override suspend fun delete(server: Server) {
val guest = guests.remove(server) ?: return
- guest.terminate()
_guests.add(-1)
+ guest.terminate()
}
override fun addListener(listener: HostListener) {
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 93a2248a..1ba3a9a1 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -133,17 +133,14 @@ internal class SimHostTest {
object : MetricExporter {
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
val metricsByName = metrics.associateBy { it.name }
- val totalWork = metricsByName["cpu.work.total"]
- if (totalWork != null) {
- requestedWork += totalWork.doubleSummaryData.points.first().sum.toLong()
+ metricsByName["cpu.work.total"]?.let {
+ requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
}
- val grantedWorkCycle = metricsByName["cpu.work.granted"]
- if (grantedWorkCycle != null) {
- grantedWork += grantedWorkCycle.doubleSummaryData.points.first().sum.toLong()
+ metricsByName["cpu.work.granted"]?.let {
+ grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
}
- val overcommittedWorkCycle = metricsByName["cpu.work.overcommit"]
- if (overcommittedWorkCycle != null) {
- overcommittedWork += overcommittedWorkCycle.doubleSummaryData.points.first().sum.toLong()
+ metricsByName["cpu.work.overcommit"]?.let {
+ overcommittedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
}
return CompletableResultCode.ofSuccess()
}
@@ -236,10 +233,10 @@ internal class SimHostTest {
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
val metricsByName = metrics.associateBy { it.name }
metricsByName["cpu.work.total"]?.let {
- requestedWork += it.doubleSummaryData.points.first().sum.toLong()
+ requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
}
metricsByName["cpu.work.granted"]?.let {
- grantedWork += it.doubleSummaryData.points.first().sum.toLong()
+ grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
}
return CompletableResultCode.ofSuccess()
}
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 324cae3e..53643aba 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -37,6 +37,7 @@ dependencies {
implementation(projects.opendcSimulator.opendcSimulatorFailures)
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
implementation(libs.config)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index d7df4454..7f428b2a 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -24,11 +24,8 @@ package org.opendc.experiments.capelin
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory
-import io.opentelemetry.sdk.metrics.common.InstrumentType
+import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricProducer
-import io.opentelemetry.sdk.metrics.view.InstrumentSelector
-import io.opentelemetry.sdk.metrics.view.View
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
@@ -212,24 +209,41 @@ class ComputeMetrics {
var runningVms: Int = 0
var unscheduledVms: Int = 0
var finishedVms: Int = 0
+ var hosts: Int = 0
+ var availableHosts = 0
}
/**
* Collect the metrics of the compute service.
*/
fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
- val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
+ return extractComputeMetrics(metricProducer.collectAllMetrics())
+}
+
+/**
+ * Extract an [ComputeMetrics] object from the specified list of metric data.
+ */
+internal fun extractComputeMetrics(metrics: Collection<MetricData>): ComputeMetrics {
val res = ComputeMetrics()
- try {
- // Hack to extract metrics from OpenTelemetry SDK
- res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- } catch (cause: Throwable) {
- logger.warn(cause) { "Failed to collect metrics" }
+ for (metric in metrics) {
+ val points = metric.longSumData.points
+
+ if (points.isEmpty()) {
+ continue
+ }
+
+ val value = points.first().value.toInt()
+ when (metric.name) {
+ "servers.submitted" -> res.submittedVms = value
+ "servers.waiting" -> res.queuedVms = value
+ "servers.unscheduled" -> res.unscheduledVms = value
+ "servers.active" -> res.runningVms = value
+ "servers.finished" -> res.finishedVms = value
+ "hosts.total" -> res.hosts = value
+ "hosts.available" -> res.availableHosts = value
+ }
}
+
return res
}
@@ -298,18 +312,9 @@ suspend fun processTrace(
* Create a [MeterProvider] instance for the experiment.
*/
fun createMeterProvider(clock: Clock): MeterProvider {
- val powerSelector = InstrumentSelector.builder()
- .setInstrumentNameRegex("power\\.usage")
- .setInstrumentType(InstrumentType.VALUE_RECORDER)
- .build()
- val powerView = View.builder()
- .setAggregatorFactory(AggregatorFactory.lastValue())
- .build()
-
return SdkMeterProvider
.builder()
.setClock(clock.toOtelClock())
- .registerView(powerSelector, powerView)
.build()
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
index 7fb2f83c..e9c817de 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
@@ -22,11 +22,12 @@
package org.opendc.experiments.capelin.monitor
-import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import org.opendc.compute.service.driver.Host
+import org.opendc.experiments.capelin.extractComputeMetrics
import java.time.Clock
/**
@@ -37,72 +38,61 @@ public class ExperimentMetricExporter(
private val clock: Clock,
private val hosts: Map<String, Host>
) : MetricExporter {
- private val hostKey = AttributeKey.stringKey("host")
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
- reportHostMetrics(metricsByName)
- reportProvisionerMetrics(metricsByName)
- return CompletableResultCode.ofSuccess()
+ return try {
+ reportHostMetrics(metrics)
+ reportProvisionerMetrics(metrics)
+ CompletableResultCode.ofSuccess()
+ } catch (e: Throwable) {
+ CompletableResultCode.ofFailure()
+ }
}
- private fun reportHostMetrics(metrics: Map<String, MetricData>) {
+ private var lastHostMetrics: Map<String, HostMetrics> = emptyMap()
+ private val hostMetricsSingleton = HostMetrics()
+
+ private fun reportHostMetrics(metrics: Collection<MetricData>) {
val hostMetrics = mutableMapOf<String, HostMetrics>()
hosts.mapValuesTo(hostMetrics) { HostMetrics() }
- mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v ->
- m.cpuDemand = v
- }
-
- mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v ->
- m.cpuUsage = v
- }
-
- mapDoubleGauge(metrics["power.usage"], hostMetrics) { m, v ->
- m.powerDraw = v
- }
-
- mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v ->
- m.requestedBurst = v.toLong()
- }
-
- mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v ->
- m.grantedBurst = v.toLong()
- }
-
- mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v ->
- m.overcommissionedBurst = v.toLong()
- }
-
- mapDoubleSummary(metrics["cpu.work.interference"], hostMetrics) { m, v ->
- m.interferedBurst = v.toLong()
- }
-
- mapLongSum(metrics["guests.active"], hostMetrics) { m, v ->
- m.numberOfDeployedImages = v.toInt()
+ for (metric in metrics) {
+ when (metric.name) {
+ "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v }
+ "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v }
+ "power.usage" -> mapDoubleGauge(metric, hostMetrics) { m, v -> m.powerDraw = v }
+ "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v }
+ "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v }
+ "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v }
+ "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v }
+ "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() }
+ }
}
for ((id, hostMetric) in hostMetrics) {
+ val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton)
val host = hosts.getValue(id)
- monitor.reportHostSlice(
+ monitor.reportHostData(
clock.millis(),
- hostMetric.requestedBurst,
- hostMetric.grantedBurst,
- hostMetric.overcommissionedBurst,
- hostMetric.interferedBurst,
+ hostMetric.totalWork - lastHostMetric.totalWork,
+ hostMetric.grantedWork - lastHostMetric.grantedWork,
+ hostMetric.overcommittedWork - lastHostMetric.overcommittedWork,
+ hostMetric.interferedWork - lastHostMetric.interferedWork,
hostMetric.cpuUsage,
hostMetric.cpuDemand,
hostMetric.powerDraw,
- hostMetric.numberOfDeployedImages,
+ hostMetric.instanceCount,
host
)
}
+
+ lastHostMetrics = hostMetrics
}
- private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
- val points = data?.doubleSummaryData?.points ?: emptyList()
+ private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data.doubleSummaryData?.points ?: emptyList()
for (point in points) {
- val uid = point.attributes[hostKey]
+ val uid = point.attributes[ResourceAttributes.HOST_ID]
val hostMetric = hostMetrics[uid]
if (hostMetric != null) {
@@ -116,7 +106,7 @@ public class ExperimentMetricExporter(
private fun mapDoubleGauge(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
val points = data?.doubleGaugeData?.points ?: emptyList()
for (point in points) {
- val uid = point.attributes[hostKey]
+ val uid = point.attributes[ResourceAttributes.HOST_ID]
val hostMetric = hostMetrics[uid]
if (hostMetric != null) {
@@ -128,7 +118,19 @@ public class ExperimentMetricExporter(
private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) {
val points = data?.longSumData?.points ?: emptyList()
for (point in points) {
- val uid = point.attributes[hostKey]
+ val uid = point.attributes[ResourceAttributes.HOST_ID]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.value)
+ }
+ }
+ }
+
+ private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data?.doubleSumData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.attributes[ResourceAttributes.HOST_ID]
val hostMetric = hostMetrics[uid]
if (hostMetric != null) {
@@ -137,35 +139,29 @@ public class ExperimentMetricExporter(
}
}
- private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) {
- val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ private fun reportProvisionerMetrics(metrics: Collection<MetricData>) {
+ val res = extractComputeMetrics(metrics)
- monitor.reportProvisionerMetrics(
+ monitor.reportServiceData(
clock.millis(),
- hosts,
- availableHosts,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
+ res.hosts,
+ res.availableHosts,
+ res.submittedVms,
+ res.runningVms,
+ res.finishedVms,
+ res.queuedVms,
+ res.unscheduledVms
)
}
private class HostMetrics {
- var requestedBurst: Long = 0
- var grantedBurst: Long = 0
- var overcommissionedBurst: Long = 0
- var interferedBurst: Long = 0
+ var totalWork: Double = 0.0
+ var grantedWork: Double = 0.0
+ var overcommittedWork: Double = 0.0
+ var interferedWork: Double = 0.0
var cpuUsage: Double = 0.0
var cpuDemand: Double = 0.0
- var numberOfDeployedImages: Int = 0
+ var instanceCount: Int = 0
var powerDraw: Double = 0.0
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 68631dee..9a4aec35 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -44,24 +44,23 @@ public interface ExperimentMonitor : AutoCloseable {
/**
* This method is invoked for a host for each slice that is finishes.
*/
- public fun reportHostSlice(
+ public fun reportHostData(
time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
+ totalWork: Double,
+ grantedWork: Double,
+ overcommittedWork: Double,
+ interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double,
powerDraw: Double,
- numberOfDeployedImages: Int,
+ instanceCount: Int,
host: Host
- ) {
- }
+ ) {}
/**
- * This method is invoked for a provisioner event.
+ * This method is invoked for reporting service data.
*/
- public fun reportProvisionerMetrics(
+ public fun reportServiceData(
time: Long,
totalHostCount: Int,
availableHostCount: Int,
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index bfdf5f3e..83351c41 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -57,16 +57,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
logger.debug { "Host ${host.uid} changed state $newState [$time]" }
}
- override fun reportHostSlice(
+ override fun reportHostData(
time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
+ totalWork: Double,
+ grantedWork: Double,
+ overcommittedWork: Double,
+ interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double,
powerDraw: Double,
- numberOfDeployedImages: Int,
+ instanceCount: Int,
host: Host
) {
hostWriter.write(
@@ -74,11 +74,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
time,
5 * 60 * 1000L,
host,
- numberOfDeployedImages,
- requestedBurst,
- grantedBurst,
- overcommissionedBurst,
- interferedBurst,
+ instanceCount,
+ totalWork.toLong(),
+ grantedWork.toLong(),
+ overcommittedWork.toLong(),
+ interferedWork.toLong(),
cpuUsage,
cpuDemand,
powerDraw,
@@ -87,7 +87,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
)
}
- override fun reportProvisionerMetrics(
+ override fun reportServiceData(
time: Long,
totalHostCount: Int,
availableHostCount: Int,
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index a3300b71..8008c944 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -120,10 +120,10 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
{ assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
{ assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(219751355711, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(206351165081, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(1148906334, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
- { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
+ { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } },
+ { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } },
+ { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
+ { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } }
)
}
@@ -160,10 +160,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(37954956986, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(34840774250, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(971076806, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
- { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
+ { assertEquals(38051879552, monitor.totalWork) { "Total requested work incorrect" } },
+ { assertEquals(34888186408, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(971668973, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -204,10 +204,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(37954956986, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(34840774250, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(971076806, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
- { assertEquals(13885404, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
+ { assertEquals(38051879552, monitor.totalWork) { "Total requested work incorrect" } },
+ { assertEquals(34888186408, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(971668973, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(13910814, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -256,10 +256,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(25336984869, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(23668547517, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(368151656, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
- { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
+ { assertEquals(25412073109, monitor.totalWork) { "Total requested work incorrect" } },
+ { assertEquals(23695061858, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(368502468, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -283,27 +283,27 @@ class CapelinIntegrationTest {
}
class TestExperimentReporter : ExperimentMonitor {
- var totalRequestedBurst = 0L
- var totalGrantedBurst = 0L
- var totalOvercommissionedBurst = 0L
- var totalInterferedBurst = 0L
+ var totalWork = 0L
+ var totalGrantedWork = 0L
+ var totalOvercommittedWork = 0L
+ var totalInterferedWork = 0L
- override fun reportHostSlice(
+ override fun reportHostData(
time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
+ totalWork: Double,
+ grantedWork: Double,
+ overcommittedWork: Double,
+ interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double,
powerDraw: Double,
- numberOfDeployedImages: Int,
+ instanceCount: Int,
host: Host,
) {
- totalRequestedBurst += requestedBurst
- totalGrantedBurst += grantedBurst
- totalOvercommissionedBurst += overcommissionedBurst
- totalInterferedBurst += interferedBurst
+ this.totalWork += totalWork.toLong()
+ totalGrantedWork += grantedWork.toLong()
+ totalOvercommittedWork += overcommittedWork.toLong()
+ totalInterferedWork += interferedWork.toLong()
}
override fun close() {}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index d8f92155..0873aac9 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -22,8 +22,9 @@
package org.opendc.experiments.tf20.core
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.api.metrics.common.Labels
import kotlinx.coroutines.*
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.SimMachine
@@ -68,22 +69,27 @@ public class SimTFDevice(
)
/**
+ * The identifier of a device.
+ */
+ private val deviceId = AttributeKey.stringKey("device.id")
+
+ /**
* The usage of the device.
*/
- private val _usage = meter.doubleValueRecorderBuilder("device.usage")
+ private val _usage = meter.histogramBuilder("device.usage")
.setDescription("The amount of device resources used")
.setUnit("MHz")
.build()
- .bind(Labels.of("device", uid.toString()))
+ .bind(Attributes.of(deviceId, uid.toString()))
/**
* The power draw of the device.
*/
- private val _power = meter.doubleValueRecorderBuilder("device.power")
+ private val _power = meter.histogramBuilder("device.power")
.setDescription("The power draw of the device")
.setUnit("W")
.build()
- .bind(Labels.of("device", uid.toString()))
+ .bind(Attributes.of(deviceId, uid.toString()))
/**
* The workload that will be run by the device.
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
index 7c7621b8..a1cb1dbf 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
@@ -22,11 +22,12 @@
package org.opendc.faas.service
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.BoundLongCounter
+import io.opentelemetry.api.metrics.BoundLongHistogram
import io.opentelemetry.api.metrics.BoundLongUpDownCounter
-import io.opentelemetry.api.metrics.BoundLongValueRecorder
import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.api.metrics.common.Labels
import org.opendc.faas.service.deployer.FunctionInstance
import java.util.*
@@ -42,76 +43,83 @@ public class FunctionObject(
meta: Map<String, Any>
) : AutoCloseable {
/**
+ * The function identifier attached to the metrics.
+ */
+ private val functionId = AttributeKey.stringKey("function")
+
+ /**
* The total amount of function invocations received by the function.
*/
- public val invocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.total")
+ public val invocations: BoundLongCounter = meter.counterBuilder("function.invocations.total")
.setDescription("Number of function invocations")
.setUnit("1")
.build()
- .bind(Labels.of("function", uid.toString()))
+ .bind(Attributes.of(functionId, uid.toString()))
/**
* The amount of function invocations that could be handled directly.
*/
- public val timelyInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.warm")
+ public val timelyInvocations: BoundLongCounter = meter.counterBuilder("function.invocations.warm")
.setDescription("Number of function invocations handled directly")
.setUnit("1")
.build()
- .bind(Labels.of("function", uid.toString()))
+ .bind(Attributes.of(functionId, uid.toString()))
/**
* The amount of function invocations that were delayed due to function deployment.
*/
- public val delayedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.cold")
+ public val delayedInvocations: BoundLongCounter = meter.counterBuilder("function.invocations.cold")
.setDescription("Number of function invocations that are delayed")
.setUnit("1")
.build()
- .bind(Labels.of("function", uid.toString()))
+ .bind(Attributes.of(functionId, uid.toString()))
/**
* The amount of function invocations that failed.
*/
- public val failedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.failed")
+ public val failedInvocations: BoundLongCounter = meter.counterBuilder("function.invocations.failed")
.setDescription("Number of function invocations that failed")
.setUnit("1")
.build()
- .bind(Labels.of("function", uid.toString()))
+ .bind(Attributes.of(functionId, uid.toString()))
/**
* The amount of instances for this function.
*/
- public val activeInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.active")
+ public val activeInstances: BoundLongUpDownCounter = meter.upDownCounterBuilder("function.instances.active")
.setDescription("Number of active function instances")
.setUnit("1")
.build()
- .bind(Labels.of("function", uid.toString()))
+ .bind(Attributes.of(functionId, uid.toString()))
/**
* The amount of idle instances for this function.
*/
- public val idleInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.idle")
+ public val idleInstances: BoundLongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle")
.setDescription("Number of idle function instances")
.setUnit("1")
.build()
- .bind(Labels.of("function", uid.toString()))
+ .bind(Attributes.of(functionId, uid.toString()))
/**
* The time that the function waited.
*/
- public val waitTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.wait")
+ public val waitTime: BoundLongHistogram = meter.histogramBuilder("function.time.wait")
+ .ofLongs()
.setDescription("Time the function has to wait before being started")
.setUnit("ms")
.build()
- .bind(Labels.of("function", uid.toString()))
+ .bind(Attributes.of(functionId, uid.toString()))
/**
* The time that the function was running.
*/
- public val activeTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.active")
+ public val activeTime: BoundLongHistogram = meter.histogramBuilder("function.time.active")
+ .ofLongs()
.setDescription("Time the function was running")
.setUnit("ms")
.build()
- .bind(Labels.of("function", uid.toString()))
+ .bind(Attributes.of(functionId, uid.toString()))
/**
* The instances associated with this function.
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
index b169436f..ccf9a5d9 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
@@ -93,7 +93,7 @@ internal class FaaSServiceImpl(
/**
* The total amount of function invocations received by the service.
*/
- private val _invocations = meter.longCounterBuilder("service.invocations.total")
+ private val _invocations = meter.counterBuilder("service.invocations.total")
.setDescription("Number of function invocations")
.setUnit("1")
.build()
@@ -101,7 +101,7 @@ internal class FaaSServiceImpl(
/**
* The amount of function invocations that could be handled directly.
*/
- private val _timelyInvocations = meter.longCounterBuilder("service.invocations.warm")
+ private val _timelyInvocations = meter.counterBuilder("service.invocations.warm")
.setDescription("Number of function invocations handled directly")
.setUnit("1")
.build()
@@ -109,7 +109,7 @@ internal class FaaSServiceImpl(
/**
* The amount of function invocations that were delayed due to function deployment.
*/
- private val _delayedInvocations = meter.longCounterBuilder("service.invocations.cold")
+ private val _delayedInvocations = meter.counterBuilder("service.invocations.cold")
.setDescription("Number of function invocations that are delayed")
.setUnit("1")
.build()
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index d287312f..6002270a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -60,6 +60,12 @@ public abstract class SimAbstractHypervisor(
get() = _vms
/**
+ * The resource counters associated with the hypervisor.
+ */
+ public override val counters: SimResourceCounters
+ get() = switch.counters
+
+ /**
* The scaling governors attached to the physical CPUs backing this hypervisor.
*/
private val governors = mutableListOf<ScalingGovernor.Logic>()
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
index c31b1f6b..3b44292d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
@@ -77,10 +77,10 @@ public class SimFairShareHypervisor(
if (timestamp > lastReport) {
listener.onSliceFinish(
this@SimFairShareHypervisor,
- (counters.demand - lastDemand).toLong(),
- (counters.actual - lastActual).toLong(),
- (counters.overcommit - lastOvercommit).toLong(),
- (counters.interference - lastInterference).toLong(),
+ counters.demand - lastDemand,
+ counters.actual - lastActual,
+ counters.overcommit - lastOvercommit,
+ counters.interference - lastInterference,
lastCpuUsage,
lastCpuDemand
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
index e398ab36..af28c346 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
@@ -25,6 +25,7 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.SimResourceCounters
/**
* A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload
@@ -37,6 +38,11 @@ public interface SimHypervisor : SimWorkload {
public val vms: Set<SimMachine>
/**
+ * The resource counters associated with the hypervisor.
+ */
+ public val counters: SimResourceCounters
+
+ /**
* Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment.
*/
public fun canFit(model: MachineModel): Boolean
@@ -58,10 +64,10 @@ public interface SimHypervisor : SimWorkload {
*/
public fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
+ requestedWork: Double,
+ grantedWork: Double,
+ overcommittedWork: Double,
+ interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
index afc4c949..918271d1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
@@ -68,16 +68,16 @@ internal class SimHypervisorTest {
@Test
fun testOvercommittedSingle() = runBlockingSimulation {
val listener = object : SimHypervisor.Listener {
- var totalRequestedWork = 0L
- var totalGrantedWork = 0L
- var totalOvercommittedWork = 0L
+ var totalRequestedWork = 0.0
+ var totalGrantedWork = 0.0
+ var totalOvercommittedWork = 0.0
override fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
+ requestedWork: Double,
+ grantedWork: Double,
+ overcommittedWork: Double,
+ interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double
) {
@@ -117,9 +117,9 @@ internal class SimHypervisorTest {
machine.close()
assertAll(
- { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") },
- { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1113300.0, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1023300.0, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(90000.0, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
{ assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), res) { "VM usage is correct" } },
{ assertEquals(1200000, clock.millis()) { "Current time is correct" } }
)
@@ -131,16 +131,16 @@ internal class SimHypervisorTest {
@Test
fun testOvercommittedDual() = runBlockingSimulation {
val listener = object : SimHypervisor.Listener {
- var totalRequestedWork = 0L
- var totalGrantedWork = 0L
- var totalOvercommittedWork = 0L
+ var totalRequestedWork = 0.0
+ var totalGrantedWork = 0.0
+ var totalOvercommittedWork = 0.0
override fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
+ requestedWork: Double,
+ grantedWork: Double,
+ overcommittedWork: Double,
+ interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double
) {
@@ -196,9 +196,9 @@ internal class SimHypervisorTest {
yield()
assertAll(
- { assertEquals(2073600, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1053600, listener.totalGrantedWork, "Granted Burst does not match") },
- { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(2073600.0, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1053600.0, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(1020000.0, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt
index d4445810..82e2a334 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt
@@ -43,16 +43,16 @@ public class WebExperimentMonitor : ExperimentMonitor {
logger.debug { "Host ${host.uid} changed state $newState [$time]" }
}
- override fun reportHostSlice(
+ override fun reportHostData(
time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
+ totalWork: Double,
+ grantedWork: Double,
+ overcommittedWork: Double,
+ interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double,
powerDraw: Double,
- numberOfDeployedImages: Int,
+ instanceCount: Int,
host: Host,
) {
processHostEvent(
@@ -60,11 +60,11 @@ public class WebExperimentMonitor : ExperimentMonitor {
time,
5 * 60 * 1000L,
host,
- numberOfDeployedImages,
- requestedBurst,
- grantedBurst,
- overcommissionedBurst,
- interferedBurst,
+ instanceCount,
+ totalWork.toLong(),
+ grantedWork.toLong(),
+ overcommittedWork.toLong(),
+ interferedWork.toLong(),
cpuUsage,
cpuDemand,
powerDraw,
@@ -120,7 +120,7 @@ public class WebExperimentMonitor : ExperimentMonitor {
private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics()
- override fun reportProvisionerMetrics(
+ override fun reportServiceData(
time: Long,
totalHostCount: Int,
availableHostCount: Int,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index 32191b8f..5329143d 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -155,7 +155,7 @@ public class WorkflowServiceImpl(
/**
* The number of jobs that have been submitted to the service.
*/
- private val submittedJobs = meter.longCounterBuilder("jobs.submitted")
+ private val submittedJobs = meter.counterBuilder("jobs.submitted")
.setDescription("Number of submitted jobs")
.setUnit("1")
.build()
@@ -163,7 +163,7 @@ public class WorkflowServiceImpl(
/**
* The number of jobs that are running.
*/
- private val runningJobs = meter.longUpDownCounterBuilder("jobs.active")
+ private val runningJobs = meter.upDownCounterBuilder("jobs.active")
.setDescription("Number of jobs running")
.setUnit("1")
.build()
@@ -171,7 +171,7 @@ public class WorkflowServiceImpl(
/**
* The number of jobs that have finished running.
*/
- private val finishedJobs = meter.longCounterBuilder("jobs.finished")
+ private val finishedJobs = meter.counterBuilder("jobs.finished")
.setDescription("Number of jobs that finished running")
.setUnit("1")
.build()
@@ -179,7 +179,7 @@ public class WorkflowServiceImpl(
/**
* The number of tasks that have been submitted to the service.
*/
- private val submittedTasks = meter.longCounterBuilder("tasks.submitted")
+ private val submittedTasks = meter.counterBuilder("tasks.submitted")
.setDescription("Number of submitted tasks")
.setUnit("1")
.build()
@@ -187,7 +187,7 @@ public class WorkflowServiceImpl(
/**
* The number of jobs that are running.
*/
- private val runningTasks = meter.longUpDownCounterBuilder("tasks.active")
+ private val runningTasks = meter.upDownCounterBuilder("tasks.active")
.setDescription("Number of tasks running")
.setUnit("1")
.build()
@@ -195,7 +195,7 @@ public class WorkflowServiceImpl(
/**
* The number of jobs that have finished running.
*/
- private val finishedTasks = meter.longCounterBuilder("tasks.finished")
+ private val finishedTasks = meter.counterBuilder("tasks.finished")
.setDescription("Number of tasks that finished running")
.setUnit("1")
.build()