summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-simulator/build.gradle.kts1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt504
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt305
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt38
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt117
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt56
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt38
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt66
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt8
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt448
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt243
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt55
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt33
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt19
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt18
-rw-r--r--opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt2
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt12
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt8
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt41
24 files changed, 1285 insertions, 743 deletions
diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts
index cad051e6..aaf69f78 100644
--- a/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -40,5 +40,6 @@ dependencies {
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testImplementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ testImplementation(projects.opendcTelemetry.opendcTelemetryCompute)
testRuntimeOnly(libs.slf4j.simple)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index be6ef11e..793db907 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -26,13 +26,16 @@ import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.*
+import org.opendc.compute.simulator.internal.Guest
+import org.opendc.compute.simulator.internal.GuestListener
import org.opendc.simulator.compute.*
import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.kernel.SimHypervisorProvider
@@ -44,11 +47,11 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.resources.SimResourceDistributorMaxMin
import org.opendc.simulator.resources.SimResourceInterpreter
import java.util.*
import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
+import kotlin.math.roundToLong
/**
* A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
@@ -100,32 +103,29 @@ public class SimHost(
/**
* The hypervisor to run multiple workloads.
*/
- public val hypervisor: SimHypervisor = hypervisor.create(
+ private val hypervisor: SimHypervisor = hypervisor.create(
interpreter,
scalingGovernor = scalingGovernor,
interferenceDomain = interferenceDomain,
listener = object : SimHypervisor.Listener {
override fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Double,
+ totalWork: Double,
grantedWork: Double,
overcommittedWork: Double,
interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double
) {
- _totalWork.add(requestedWork)
- _grantedWork.add(grantedWork)
- _overcommittedWork.add(overcommittedWork)
- _interferedWork.add(interferedWork)
- _cpuDemand.record(cpuDemand)
- _cpuUsage.record(cpuUsage)
- _powerUsage.record(machine.powerDraw)
-
- reportTime()
+ _cpuDemand = cpuDemand
+ _cpuUsage = cpuUsage
+
+ collectTime()
}
}
)
+ private var _cpuUsage = 0.0
+ private var _cpuDemand = 0.0
/**
* The virtual machines running on the hypervisor.
@@ -145,109 +145,23 @@ public class SimHost(
override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size })
/**
- * The total number of guests.
- */
- private val _guests = meter.upDownCounterBuilder("guests.total")
- .setDescription("Total number of guests")
- .setUnit("1")
- .build()
-
- /**
- * The number of active guests on the host.
- */
- private val _activeGuests = meter.upDownCounterBuilder("guests.active")
- .setDescription("Number of active guests")
- .setUnit("1")
- .build()
-
- /**
- * The CPU demand of the host.
- */
- private val _cpuDemand = meter.histogramBuilder("cpu.demand")
- .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
- .setUnit("MHz")
- .build()
-
- /**
- * The CPU usage of the host.
- */
- private val _cpuUsage = meter.histogramBuilder("cpu.usage")
- .setDescription("The amount of CPU resources used by the host")
- .setUnit("MHz")
- .build()
-
- /**
- * The power usage of the host.
- */
- private val _powerUsage = meter.histogramBuilder("power.usage")
- .setDescription("The amount of power used by the CPU")
- .setUnit("W")
- .build()
-
- /**
- * The total amount of work supplied to the CPU.
- */
- private val _totalWork = meter.counterBuilder("cpu.work.total")
- .setDescription("The amount of work supplied to the CPU")
- .setUnit("1")
- .ofDoubles()
- .build()
-
- /**
- * The work performed by the CPU.
- */
- private val _grantedWork = meter.counterBuilder("cpu.work.granted")
- .setDescription("The amount of work performed by the CPU")
- .setUnit("1")
- .ofDoubles()
- .build()
-
- /**
- * The amount not performed by the CPU due to overcommitment.
- */
- private val _overcommittedWork = meter.counterBuilder("cpu.work.overcommit")
- .setDescription("The amount of work not performed by the CPU due to overcommitment")
- .setUnit("1")
- .ofDoubles()
- .build()
-
- /**
- * The amount of work not performed by the CPU due to interference.
- */
- private val _interferedWork = meter.counterBuilder("cpu.work.interference")
- .setDescription("The amount of work not performed by the CPU due to interference")
- .setUnit("1")
- .ofDoubles()
- .build()
-
- /**
- * The amount of time in the system.
+ * The [GuestListener] that listens for guest events.
*/
- private val _totalTime = meter.counterBuilder("host.time.total")
- .setDescription("The amount of time in the system")
- .setUnit("ms")
- .build()
-
- /**
- * The uptime of the host.
- */
- private val _upTime = meter.counterBuilder("host.time.up")
- .setDescription("The uptime of the host")
- .setUnit("ms")
- .build()
+ private val guestListener = object : GuestListener {
+ override fun onStart(guest: Guest) {
+ listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) }
+ }
- /**
- * The downtime of the host.
- */
- private val _downTime = meter.counterBuilder("host.time.down")
- .setDescription("The downtime of the host")
- .setUnit("ms")
- .build()
+ override fun onStop(guest: Guest) {
+ listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) }
+ }
+ }
init {
// Launch hypervisor onto machine
scope.launch {
try {
+ _bootTime = clock.millis()
_state = HostState.UP
machine.run(this@SimHost.hypervisor, emptyMap())
} catch (_: CancellationException) {
@@ -259,29 +173,48 @@ public class SimHost(
_state = HostState.DOWN
}
}
- }
-
- private var _lastReport = clock.millis()
-
- private fun reportTime() {
- if (!scope.isActive)
- return
-
- val now = clock.millis()
- val duration = now - _lastReport
-
- _totalTime.add(duration)
- when (_state) {
- HostState.UP -> _upTime.add(duration)
- HostState.DOWN -> _downTime.add(duration)
- }
- // Track time of guests
- for (guest in guests.values) {
- guest.reportTime()
- }
-
- _lastReport = now
+ meter.upDownCounterBuilder("system.guests")
+ .setDescription("Number of guests on this host")
+ .setUnit("1")
+ .buildWithCallback(::collectGuests)
+ meter.gaugeBuilder("system.cpu.limit")
+ .setDescription("Amount of CPU resources available to the host")
+ .buildWithCallback(::collectCpuLimit)
+ meter.gaugeBuilder("system.cpu.demand")
+ .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
+ .setUnit("MHz")
+ .buildWithCallback { result -> result.observe(_cpuDemand) }
+ meter.gaugeBuilder("system.cpu.usage")
+ .setDescription("Amount of CPU resources used by the host")
+ .setUnit("MHz")
+ .buildWithCallback { result -> result.observe(_cpuUsage) }
+ meter.gaugeBuilder("system.cpu.utilization")
+ .setDescription("Utilization of the CPU resources of the host")
+ .setUnit("%")
+ .buildWithCallback { result -> result.observe(_cpuUsage / _cpuLimit) }
+ meter.counterBuilder("system.cpu.time")
+ .setDescription("Amount of CPU time spent by the host")
+ .setUnit("s")
+ .buildWithCallback(::collectCpuTime)
+ meter.gaugeBuilder("system.power.usage")
+ .setDescription("Power usage of the host ")
+ .setUnit("W")
+ .buildWithCallback { result -> result.observe(machine.powerDraw) }
+ meter.counterBuilder("system.power.total")
+ .setDescription("Amount of energy used by the CPU")
+ .setUnit("J")
+ .ofDoubles()
+ .buildWithCallback(::collectPowerTotal)
+ meter.counterBuilder("system.time")
+ .setDescription("The uptime of the host")
+ .setUnit("s")
+ .buildWithCallback(::collectTime)
+ meter.gaugeBuilder("system.time.boot")
+ .setDescription("The boot time of the host")
+ .setUnit("1")
+ .ofLongs()
+ .buildWithCallback(::collectBootTime)
}
override fun canFit(server: Server): Boolean {
@@ -295,8 +228,17 @@ public class SimHost(
override suspend fun spawn(server: Server, start: Boolean) {
val guest = guests.computeIfAbsent(server) { key ->
require(canFit(key)) { "Server does not fit" }
- _guests.add(1)
- Guest(key, hypervisor.createMachine(key.flavor.toMachineModel(), key.name))
+
+ val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name)
+ Guest(
+ scope.coroutineContext,
+ clock,
+ this,
+ mapper,
+ guestListener,
+ server,
+ machine
+ )
}
if (start) {
@@ -320,7 +262,6 @@ public class SimHost(
override suspend fun delete(server: Server) {
val guest = guests.remove(server) ?: return
- _guests.add(-1)
guest.terminate()
}
@@ -333,7 +274,7 @@ public class SimHost(
}
override fun close() {
- reportTime()
+ reset()
scope.cancel()
machine.close()
}
@@ -341,22 +282,35 @@ public class SimHost(
override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
public suspend fun fail() {
- reportTime()
- _state = HostState.DOWN
+ reset()
+
for (guest in guests.values) {
guest.fail()
}
}
public suspend fun recover() {
- reportTime()
+ collectTime()
_state = HostState.UP
+ _bootTime = clock.millis()
+
for (guest in guests.values) {
guest.start()
}
}
/**
+ * Reset the machine.
+ */
+ private fun reset() {
+ collectTime()
+
+ _state = HostState.DOWN
+ _cpuUsage = 0.0
+ _cpuDemand = 0.0
+ }
+
+ /**
* Convert flavor to machine model.
*/
private fun Flavor.toMachineModel(): MachineModel {
@@ -368,162 +322,168 @@ public class SimHost(
return MachineModel(processingUnits, memoryUnits)
}
- private fun onGuestStart(vm: Guest) {
- _activeGuests.add(1)
- listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
- }
+ private val STATE_KEY = AttributeKey.stringKey("state")
- private fun onGuestStop(vm: Guest) {
- _activeGuests.add(-1)
- listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
- }
+ private val terminatedState = Attributes.of(STATE_KEY, "terminated")
+ private val runningState = Attributes.of(STATE_KEY, "running")
+ private val errorState = Attributes.of(STATE_KEY, "error")
+ private val invalidState = Attributes.of(STATE_KEY, "invalid")
/**
- * A virtual machine instance that the driver manages.
+ * Helper function to collect the guest counts on this host.
*/
- private inner class Guest(val server: Server, val machine: SimMachine) {
- var state: ServerState = ServerState.TERMINATED
-
- /**
- * The attributes of the guest.
- */
- val attributes: Attributes = Attributes.builder()
- .put(ResourceAttributes.HOST_NAME, server.name)
- .put(ResourceAttributes.HOST_ID, server.uid.toString())
- .put(ResourceAttributes.HOST_TYPE, server.flavor.name)
- .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong())
- .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize)
- .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" })
- .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
- .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name)
- .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString())
- .build()
-
- /**
- * The amount of time in the system.
- */
- private val _totalTime = meter.counterBuilder("guest.time.total")
- .setDescription("The amount of time in the system")
- .setUnit("ms")
- .build()
- .bind(attributes)
-
- /**
- * The uptime of the guest.
- */
- private val _runningTime = meter.counterBuilder("guest.time.running")
- .setDescription("The uptime of the guest")
- .setUnit("ms")
- .build()
- .bind(attributes)
-
- /**
- * The time the guest is in an error state.
- */
- private val _errorTime = meter.counterBuilder("guest.time.error")
- .setDescription("The time the guest is in an error state")
- .setUnit("ms")
- .build()
- .bind(attributes)
-
- suspend fun start() {
- when (state) {
- ServerState.TERMINATED, ServerState.ERROR -> {
- logger.info { "User requested to start server ${server.uid}" }
- launch()
- }
- ServerState.RUNNING -> return
- ServerState.DELETED -> {
- logger.warn { "User tried to start terminated server" }
- throw IllegalArgumentException("Server is terminated")
- }
- else -> assert(false) { "Invalid state transition" }
+ private fun collectGuests(result: ObservableLongMeasurement) {
+ var terminated = 0L
+ var running = 0L
+ var error = 0L
+ var invalid = 0L
+
+ for ((_, guest) in guests) {
+ when (guest.state) {
+ ServerState.TERMINATED -> terminated++
+ ServerState.RUNNING -> running++
+ ServerState.ERROR -> error++
+ else -> invalid++
}
}
- suspend fun stop() {
- when (state) {
- ServerState.RUNNING, ServerState.ERROR -> {
- val job = job ?: throw IllegalStateException("Server should be active")
- job.cancel()
- job.join()
- }
- ServerState.TERMINATED, ServerState.DELETED -> return
- else -> assert(false) { "Invalid state transition" }
- }
- }
+ result.observe(terminated, terminatedState)
+ result.observe(running, runningState)
+ result.observe(error, errorState)
+ result.observe(invalid, invalidState)
+ }
- suspend fun terminate() {
- stop()
- machine.close()
- state = ServerState.DELETED
+ private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
+
+ /**
+ * Helper function to collect the CPU limits of a machine.
+ */
+ private fun collectCpuLimit(result: ObservableDoubleMeasurement) {
+ result.observe(_cpuLimit)
+
+ for (guest in guests.values) {
+ guest.collectCpuLimit(result)
}
+ }
- suspend fun fail() {
- if (state != ServerState.RUNNING) {
- return
- }
- stop()
- state = ServerState.ERROR
+ private var _lastCpuTimeCallback = clock.millis()
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ private fun collectCpuTime(result: ObservableLongMeasurement) {
+ val now = clock.millis()
+ val duration = now - _lastCpuTimeCallback
+
+ try {
+ collectCpuTime(duration, result)
+ } finally {
+ _lastCpuTimeCallback = now
}
+ }
- private var job: Job? = null
-
- private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont ->
- assert(job == null) { "Concurrent job running" }
- val workload = mapper.createWorkload(server)
-
- job = scope.launch {
- try {
- delay(1) // TODO Introduce boot time
- init()
- cont.resume(Unit)
- } catch (e: Throwable) {
- cont.resumeWithException(e)
- }
- try {
- machine.run(workload, mapOf("driver" to this@SimHost, "server" to server))
- exit(null)
- } catch (cause: Throwable) {
- exit(cause)
- } finally {
- job = null
- }
- }
+ private val _activeState = Attributes.of(STATE_KEY, "active")
+ private val _stealState = Attributes.of(STATE_KEY, "steal")
+ private val _lostState = Attributes.of(STATE_KEY, "lost")
+ private val _idleState = Attributes.of(STATE_KEY, "idle")
+ private var _totalTime = 0.0
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ private fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
+ val coreCount = this.model.cpuCount
+ val d = coreCount / _cpuLimit
+
+ val counters = hypervisor.counters
+ val grantedWork = counters.actual
+ val overcommittedWork = counters.overcommit
+ val interferedWork = (counters as? SimResourceDistributorMaxMin.Counters)?.interference ?: 0.0
+
+ _totalTime += (duration / 1000.0) * coreCount
+ val activeTime = (grantedWork * d).roundToLong()
+ val idleTime = (_totalTime - grantedWork * d).roundToLong()
+ val stealTime = (overcommittedWork * d).roundToLong()
+ val lostTime = (interferedWork * d).roundToLong()
+
+ result.observe(activeTime, _activeState)
+ result.observe(idleTime, _idleState)
+ result.observe(stealTime, _stealState)
+ result.observe(lostTime, _lostState)
+
+ for (guest in guests.values) {
+ guest.collectCpuTime(duration, result)
}
+ }
+
+ private var _lastPowerCallback = clock.millis()
+ private var _totalPower = 0.0
- private fun init() {
- state = ServerState.RUNNING
- onGuestStart(this)
+ /**
+ * Helper function to collect the total power usage of the machine.
+ */
+ private fun collectPowerTotal(result: ObservableDoubleMeasurement) {
+ val now = clock.millis()
+ val duration = now - _lastPowerCallback
+
+ _totalPower += duration / 1000.0 * machine.powerDraw
+ result.observe(_totalPower)
+
+ _lastPowerCallback = now
+ }
+
+ private var _lastReport = clock.millis()
+
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun collectTime(result: ObservableLongMeasurement? = null) {
+ val now = clock.millis()
+ val duration = now - _lastReport
+
+ try {
+ collectTime(duration, result)
+ } finally {
+ _lastReport = now
}
+ }
- private fun exit(cause: Throwable?) {
- state =
- if (cause == null)
- ServerState.TERMINATED
- else
- ServerState.ERROR
+ private var _uptime = 0L
+ private var _downtime = 0L
+ private val _upState = Attributes.of(STATE_KEY, "up")
+ private val _downState = Attributes.of(STATE_KEY, "down")
- onGuestStop(this)
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun collectTime(duration: Long, result: ObservableLongMeasurement? = null) {
+ if (state == HostState.UP) {
+ _uptime += duration
+ } else if (state == HostState.DOWN && scope.isActive) {
+ // Only increment downtime if the machine is in a failure state
+ _downtime += duration
}
- private var _lastReport = clock.millis()
+ result?.observe(_uptime, _upState)
+ result?.observe(_downtime, _downState)
- fun reportTime() {
- if (state == ServerState.DELETED)
- return
+ for (guest in guests.values) {
+ guest.collectUptime(duration, result)
+ }
+ }
- val now = clock.millis()
- val duration = now - _lastReport
+ private var _bootTime = Long.MIN_VALUE
- _totalTime.add(duration)
- when (state) {
- ServerState.RUNNING -> _runningTime.add(duration)
- ServerState.ERROR -> _errorTime.add(duration)
- else -> {}
- }
+ /**
+ * Helper function to track the boot time of a machine.
+ */
+ private fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ if (_bootTime != Long.MIN_VALUE) {
+ result?.observe(_bootTime)
+ }
- _lastReport = now
+ for (guest in guests.values) {
+ guest.collectBootTime(result)
}
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
new file mode 100644
index 00000000..90562e2f
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -0,0 +1,305 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import kotlinx.coroutines.*
+import mu.KotlinLogging
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.SimWorkloadMapper
+import org.opendc.simulator.compute.SimAbstractMachine
+import org.opendc.simulator.compute.SimMachine
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.roundToLong
+
+/**
+ * A virtual machine instance that is managed by a [SimHost].
+ */
+internal class Guest(
+ context: CoroutineContext,
+ private val clock: Clock,
+ val host: SimHost,
+ private val mapper: SimWorkloadMapper,
+ private val listener: GuestListener,
+ val server: Server,
+ val machine: SimMachine
+) {
+ /**
+ * The [CoroutineScope] of the guest.
+ */
+ private val scope: CoroutineScope = CoroutineScope(context + Job())
+
+ /**
+ * The logger instance of this guest.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The state of the [Guest].
+ *
+ * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for
+ * a server.
+ */
+ var state: ServerState = ServerState.TERMINATED
+
+ /**
+ * The attributes of the guest.
+ */
+ val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, server.name)
+ .put(ResourceAttributes.HOST_ID, server.uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, server.flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString())
+ .build()
+
+ /**
+ * Start the guest.
+ */
+ suspend fun start() {
+ when (state) {
+ ServerState.TERMINATED, ServerState.ERROR -> {
+ logger.info { "User requested to start server ${server.uid}" }
+ doStart()
+ }
+ ServerState.RUNNING -> return
+ ServerState.DELETED -> {
+ logger.warn { "User tried to start terminated server" }
+ throw IllegalArgumentException("Server is terminated")
+ }
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ /**
+ * Stop the guest.
+ */
+ suspend fun stop() {
+ when (state) {
+ ServerState.RUNNING -> doStop(ServerState.TERMINATED)
+ ServerState.ERROR -> doRecover()
+ ServerState.TERMINATED, ServerState.DELETED -> return
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ /**
+ * Terminate the guest.
+ *
+ * This operation will stop the guest if it is running on the host and remove all resources associated with the
+ * guest.
+ */
+ suspend fun terminate() {
+ stop()
+
+ state = ServerState.DELETED
+
+ machine.close()
+ scope.cancel()
+ }
+
+ /**
+ * Fail the guest if it is active.
+ *
+ * This operation forcibly stops the guest and puts the server into an error state.
+ */
+ suspend fun fail() {
+ if (state != ServerState.RUNNING) {
+ return
+ }
+
+ doStop(ServerState.ERROR)
+ }
+
+ /**
+ * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active.
+ */
+ private var job: Job? = null
+
+ /**
+ * Launch the guest on the simulated
+ */
+ private suspend fun doStart() {
+ assert(job == null) { "Concurrent job running" }
+ val workload = mapper.createWorkload(server)
+
+ val job = scope.launch { runMachine(workload) }
+ this.job = job
+
+ state = ServerState.RUNNING
+ onStart()
+
+ job.invokeOnCompletion { cause ->
+ this.job = null
+ onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED)
+ }
+ }
+
+ /**
+ * Attempt to stop the server and put it into [target] state.
+ */
+ private suspend fun doStop(target: ServerState) {
+ assert(job != null) { "Invalid job state" }
+ val job = job ?: return
+ job.cancel()
+ job.join()
+
+ state = target
+ }
+
+ /**
+ * Attempt to recover from an error state.
+ */
+ private fun doRecover() {
+ state = ServerState.TERMINATED
+ }
+
+ /**
+ * Run the process that models the virtual machine lifecycle as a coroutine.
+ */
+ private suspend fun runMachine(workload: SimWorkload) {
+ delay(1) // TODO Introduce model for boot time
+ machine.run(workload, mapOf("driver" to host, "server" to server))
+ }
+
+ /**
+ * This method is invoked when the guest was started on the host and has booted into a running state.
+ */
+ private fun onStart() {
+ _bootTime = clock.millis()
+ state = ServerState.RUNNING
+ listener.onStart(this)
+ }
+
+ /**
+ * This method is invoked when the guest stopped.
+ */
+ private fun onStop(target: ServerState) {
+ state = target
+ listener.onStop(this)
+ }
+
+ private val STATE_KEY = AttributeKey.stringKey("state")
+
+ private var _uptime = 0L
+ private var _downtime = 0L
+ private val _upState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "up")
+ .build()
+ private val _downState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "down")
+ .build()
+
+ /**
+ * Helper function to track the uptime of the guest.
+ */
+ fun collectUptime(duration: Long, result: ObservableLongMeasurement? = null) {
+ if (state == ServerState.RUNNING) {
+ _uptime += duration
+ } else if (state == ServerState.ERROR) {
+ _downtime += duration
+ }
+
+ result?.observe(_uptime, _upState)
+ result?.observe(_downtime, _downState)
+ }
+
+ private var _bootTime = Long.MIN_VALUE
+
+ /**
+ * Helper function to track the boot time of the guest.
+ */
+ fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ if (_bootTime != Long.MIN_VALUE) {
+ result?.observe(_bootTime)
+ }
+ }
+
+ private val _activeState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "active")
+ .build()
+ private val _stealState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "steal")
+ .build()
+ private val _lostState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "lost")
+ .build()
+ private val _idleState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "idle")
+ .build()
+ private var _totalTime = 0.0
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
+ val coreCount = server.flavor.cpuCount
+ val d = coreCount / _cpuLimit
+
+ var grantedWork = 0.0
+ var overcommittedWork = 0.0
+
+ for (cpu in (machine as SimAbstractMachine).cpus) {
+ val counters = cpu.counters
+ grantedWork += counters.actual
+ overcommittedWork += counters.overcommit
+ }
+
+ _totalTime += (duration / 1000.0) * coreCount
+ val activeTime = (grantedWork * d).roundToLong()
+ val idleTime = (_totalTime - grantedWork * d).roundToLong()
+ val stealTime = (overcommittedWork * d).roundToLong()
+
+ result.observe(activeTime, _activeState)
+ result.observe(idleTime, _idleState)
+ result.observe(stealTime, _stealState)
+ result.observe(0, _lostState)
+ }
+
+ private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
+
+ /**
+ * Helper function to collect the CPU limits of a machine.
+ */
+ fun collectCpuLimit(result: ObservableDoubleMeasurement) {
+ result.observe(_cpuLimit, attributes)
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
new file mode 100644
index 00000000..e6d0fdad
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+/**
+ * Helper interface to listen for [Guest] events.
+ */
+internal interface GuestListener {
+ /**
+ * This method is invoked when the guest machine is running.
+ */
+ fun onStart(guest: Guest)
+
+ /**
+ * This method is invoked when the guest machine is stopped.
+ */
+ fun onStop(guest: Guest)
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 318b5a5d..9c879e5e 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -23,11 +23,9 @@
package org.opendc.compute.simulator
import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.resources.Resource
import kotlinx.coroutines.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -44,17 +42,20 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.HOST_ID
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Duration
import java.util.*
import kotlin.coroutines.resume
-import kotlin.math.roundToLong
/**
* Basic test-suite for the hypervisor.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class SimHostTest {
private lateinit var machineModel: MachineModel
@@ -73,18 +74,23 @@ internal class SimHostTest {
*/
@Test
fun testOvercommitted() = runBlockingSimulation {
- var totalWork = 0L
- var grantedWork = 0L
- var overcommittedWork = 0L
+ var idleTime = 0L
+ var activeTime = 0L
+ var stealTime = 0L
+ val hostId = UUID.randomUUID()
+ val hostResource = Resource.builder()
+ .put(HOST_ID, hostId.toString())
+ .build()
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
+ .setResource(hostResource)
.setClock(clock.toOtelClock())
.build()
val interpreter = SimResourceInterpreter(coroutineContext, clock)
val virtDriver = SimHost(
- uid = UUID.randomUUID(),
+ uid = hostId,
name = "test",
model = machineModel,
meta = emptyMap(),
@@ -132,20 +138,16 @@ internal class SimHostTest {
// Setup metric reader
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
-
- totalWork = metricsByName.getValue("cpu.work.total").doubleSumData.points.first().value.roundToLong()
- grantedWork = metricsByName.getValue("cpu.work.granted").doubleSumData.points.first().value.roundToLong()
- overcommittedWork = metricsByName.getValue("cpu.work.overcommit").doubleSumData.points.first().value.roundToLong()
- return CompletableResultCode.ofSuccess()
+ ComputeMetricExporter(
+ clock,
+ object : ComputeMonitor {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ stealTime += data.cpuStealTime
+ }
}
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
- },
+ ),
exportInterval = Duration.ofSeconds(duration)
)
@@ -172,9 +174,9 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(4147200, totalWork, "Requested work does not match") },
- { assertEquals(2107200, grantedWork, "Granted work does not match") },
- { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") },
+ { assertEquals(659, activeTime, "Active time does not match") },
+ { assertEquals(2342, idleTime, "Idle time does not match") },
+ { assertEquals(638, stealTime, "Steal time does not match") },
{ assertEquals(1500001, clock.millis()) }
)
}
@@ -184,21 +186,26 @@ internal class SimHostTest {
*/
@Test
fun testFailure() = runBlockingSimulation {
- var totalWork = 0L
- var grantedWork = 0L
- var totalTime = 0L
- var downTime = 0L
- var guestTotalTime = 0L
- var guestDownTime = 0L
-
+ var activeTime = 0L
+ var idleTime = 0L
+ var uptime = 0L
+ var downtime = 0L
+ var guestUptime = 0L
+ var guestDowntime = 0L
+
+ val hostId = UUID.randomUUID()
+ val hostResource = Resource.builder()
+ .put(HOST_ID, hostId.toString())
+ .build()
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
+ .setResource(hostResource)
.setClock(clock.toOtelClock())
.build()
val interpreter = SimResourceInterpreter(coroutineContext, clock)
val host = SimHost(
- uid = UUID.randomUUID(),
+ uid = hostId,
name = "test",
model = machineModel,
meta = emptyMap(),
@@ -230,24 +237,22 @@ internal class SimHostTest {
// Setup metric reader
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
-
- totalWork = metricsByName.getValue("cpu.work.total").doubleSumData.points.first().value.roundToLong()
- grantedWork = metricsByName.getValue("cpu.work.granted").doubleSumData.points.first().value.roundToLong()
- totalTime = metricsByName.getValue("host.time.total").longSumData.points.first().value
- downTime = metricsByName.getValue("host.time.down").longSumData.points.first().value
- guestTotalTime = metricsByName.getValue("guest.time.total").longSumData.points.first().value
- guestDownTime = metricsByName.getValue("guest.time.error").longSumData.points.first().value
-
- return CompletableResultCode.ofSuccess()
- }
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+ ComputeMetricExporter(
+ clock,
+ object : ComputeMonitor {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ uptime += data.uptime
+ downtime += data.downtime
+ }
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
- },
+ override fun record(data: ServerData) {
+ guestUptime += data.uptime
+ guestDowntime += data.downtime
+ }
+ }
+ ),
exportInterval = Duration.ofSeconds(duration)
)
@@ -276,12 +281,12 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(2226040, totalWork, "Total time does not match") },
- { assertEquals(1086040, grantedWork, "Down time does not match") },
- { assertEquals(1200001, totalTime, "Total time does not match") },
- { assertEquals(1200001, guestTotalTime, "Guest total time does not match") },
- { assertEquals(5000, downTime, "Down time does not match") },
- { assertEquals(5000, guestDownTime, "Guest down time does not match") },
+ { assertEquals(2661, idleTime, "Idle time does not match") },
+ { assertEquals(339, activeTime, "Active time does not match") },
+ { assertEquals(1195001, uptime, "Uptime does not match") },
+ { assertEquals(5000, downtime, "Downtime does not match") },
+ { assertEquals(1195000, guestUptime, "Guest uptime does not match") },
+ { assertEquals(5000, guestDowntime, "Guest downtime does not match") },
)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 3ec424f1..6261ebbf 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -149,9 +149,10 @@ abstract class Portfolio(name: String) : Experiment(name) {
} finally {
simulator.close()
metricReader.close()
+ monitor.close()
}
- val monitorResults = collectServiceMetrics(clock.millis(), simulator.producers[0])
+ val monitorResults = collectServiceMetrics(clock.instant(), simulator.producers[0])
logger.debug {
"Scheduler " +
"Success=${monitorResults.attemptsSuccess} " +
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
index 5684bde9..e3d15c3b 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
@@ -27,7 +27,6 @@ import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.example.Paper.schema
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
index fa00fc35..36207045 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
@@ -44,20 +44,31 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
}
override fun convert(builder: GenericRecordBuilder, data: HostData) {
- builder["timestamp"] = data.timestamp
+ builder["timestamp"] = data.timestamp.toEpochMilli()
+
builder["host_id"] = data.host.id
- builder["powered_on"] = true
+ builder["num_cpus"] = data.host.cpuCount
+ builder["mem_capacity"] = data.host.memCapacity
+
builder["uptime"] = data.uptime
builder["downtime"] = data.downtime
- builder["total_work"] = data.totalWork
- builder["granted_work"] = data.grantedWork
- builder["overcommitted_work"] = data.overcommittedWork
- builder["interfered_work"] = data.interferedWork
- builder["cpu_usage"] = data.cpuUsage
- builder["cpu_demand"] = data.cpuDemand
- builder["power_draw"] = data.powerDraw
- builder["num_instances"] = data.instanceCount
- builder["num_cpus"] = data.host.cpuCount
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ builder["boot_time"] = bootTime.toEpochMilli()
+ }
+
+ builder["cpu_limit"] = data.cpuLimit
+ builder["cpu_time_active"] = data.cpuActiveTime
+ builder["cpu_time_idle"] = data.cpuIdleTime
+ builder["cpu_time_steal"] = data.cpuStealTime
+ builder["cpu_time_lost"] = data.cpuLostTime
+
+ builder["power_total"] = data.powerTotal
+
+ builder["guests_terminated"] = data.guestsTerminated
+ builder["guests_running"] = data.guestsRunning
+ builder["guests_error"] = data.guestsError
+ builder["guests_invalid"] = data.guestsInvalid
}
override fun toString(): String = "host-writer"
@@ -69,18 +80,21 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
.fields()
.requiredLong("timestamp")
.requiredString("host_id")
- .requiredBoolean("powered_on")
+ .requiredInt("num_cpus")
+ .requiredLong("mem_capacity")
.requiredLong("uptime")
.requiredLong("downtime")
- .requiredDouble("total_work")
- .requiredDouble("granted_work")
- .requiredDouble("overcommitted_work")
- .requiredDouble("interfered_work")
- .requiredDouble("cpu_usage")
- .requiredDouble("cpu_demand")
- .requiredDouble("power_draw")
- .requiredInt("num_instances")
- .requiredInt("num_cpus")
+ .optionalLong("boot_time")
+ .requiredDouble("cpu_limit")
+ .requiredLong("cpu_time_active")
+ .requiredLong("cpu_time_idle")
+ .requiredLong("cpu_time_steal")
+ .requiredLong("cpu_time_lost")
+ .requiredDouble("power_total")
+ .requiredInt("guests_terminated")
+ .requiredInt("guests_running")
+ .requiredInt("guests_error")
+ .requiredInt("guests_invalid")
.endRecord()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
index bb2db4b7..c5a5e7c0 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
@@ -40,18 +40,31 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
return builder
.withDictionaryEncoding("server_id", true)
- .withDictionaryEncoding("state", true)
+ .withDictionaryEncoding("host_id", true)
.build()
}
override fun convert(builder: GenericRecordBuilder, data: ServerData) {
- builder["timestamp"] = data.timestamp
- builder["server_id"] = data.server
- // builder["state"] = data.server.state
+ builder["timestamp"] = data.timestamp.toEpochMilli()
+
+ builder["server_id"] = data.server.id
+ builder["host_id"] = data.host?.id
+ builder["num_vcpus"] = data.server.cpuCount
+ builder["mem_capacity"] = data.server.memCapacity
+
builder["uptime"] = data.uptime
builder["downtime"] = data.downtime
- // builder["num_vcpus"] = data.server.flavor.cpuCount
- // builder["mem_capacity"] = data.server.flavor.memorySize
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ builder["boot_time"] = bootTime.toEpochMilli()
+ }
+ builder["scheduling_latency"] = data.schedulingLatency
+
+ builder["cpu_limit"] = data.cpuLimit
+ builder["cpu_time_active"] = data.cpuActiveTime
+ builder["cpu_time_idle"] = data.cpuIdleTime
+ builder["cpu_time_steal"] = data.cpuStealTime
+ builder["cpu_time_lost"] = data.cpuLostTime
}
override fun toString(): String = "server-writer"
@@ -63,11 +76,18 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
.fields()
.requiredLong("timestamp")
.requiredString("server_id")
- .requiredString("state")
- .requiredLong("uptime")
- .requiredLong("downtime")
+ .optionalString("host_id")
.requiredInt("num_vcpus")
.requiredLong("mem_capacity")
+ .requiredLong("uptime")
+ .requiredLong("downtime")
+ .optionalLong("boot_time")
+ .requiredLong("scheduling_latency")
+ .requiredDouble("cpu_limit")
+ .requiredLong("cpu_time_active")
+ .requiredLong("cpu_time_idle")
+ .requiredLong("cpu_time_steal")
+ .requiredLong("cpu_time_lost")
.endRecord()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
index 29b48878..d9ca55cb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
@@ -35,7 +35,7 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) {
override fun convert(builder: GenericRecordBuilder, data: ServiceData) {
- builder["timestamp"] = data.timestamp
+ builder["timestamp"] = data.timestamp.toEpochMilli()
builder["hosts_up"] = data.hostsUp
builder["hosts_down"] = data.hostsDown
builder["servers_pending"] = data.serversPending
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 81405acf..727530e3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -50,7 +50,6 @@ import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Duration
import java.util.*
-import kotlin.math.roundToLong
/**
* An integration test suite for the Capelin experiments.
@@ -102,7 +101,7 @@ class CapelinIntegrationTest {
metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -118,11 +117,11 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(220346412191, monitor.totalWork) { "Incorrect requested burst" } },
- { assertEquals(206667852689, monitor.totalGrantedWork) { "Incorrect granted burst" } },
- { assertEquals(1151612221, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
- { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } },
- { assertEquals(9.088769763540529E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } },
+ { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
+ { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
@@ -151,7 +150,7 @@ class CapelinIntegrationTest {
metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -163,10 +162,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } },
- { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -202,7 +201,7 @@ class CapelinIntegrationTest {
metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -214,10 +213,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } },
- { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(2960974524, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -247,7 +246,7 @@ class CapelinIntegrationTest {
metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -259,10 +258,11 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(38385856700, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(34886670127, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(979997628, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } }
)
}
@@ -286,18 +286,20 @@ class CapelinIntegrationTest {
}
class TestExperimentReporter : ComputeMonitor {
- var totalWork = 0L
- var totalGrantedWork = 0L
- var totalOvercommittedWork = 0L
- var totalInterferedWork = 0L
- var totalPowerDraw = 0.0
+ var idleTime = 0L
+ var activeTime = 0L
+ var stealTime = 0L
+ var lostTime = 0L
+ var energyUsage = 0.0
+ var uptime = 0L
override fun record(data: HostData) {
- this.totalWork += data.totalWork.roundToLong()
- totalGrantedWork += data.grantedWork.roundToLong()
- totalOvercommittedWork += data.overcommittedWork.roundToLong()
- totalInterferedWork += data.interferedWork.roundToLong()
- totalPowerDraw += data.powerDraw
+ idleTime += data.cpuIdleTime
+ activeTime += data.cpuActiveTime
+ stealTime += data.cpuStealTime
+ lostTime += data.cpuLostTime
+ energyUsage += data.powerTotal
+ uptime += data.uptime
}
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 266db0dd..f9db048d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -49,22 +49,22 @@ public abstract class SimAbstractMachine(
/**
* The resources allocated for this machine.
*/
- protected abstract val cpus: List<SimProcessingUnit>
+ public abstract val cpus: List<SimProcessingUnit>
/**
* The memory interface of the machine.
*/
- protected val memory: SimMemory = Memory(SimResourceSource(model.memory.sumOf { it.size }.toDouble(), interpreter), model.memory)
+ public val memory: SimMemory = Memory(SimResourceSource(model.memory.sumOf { it.size }.toDouble(), interpreter), model.memory)
/**
* The network interfaces available to the machine.
*/
- protected val net: List<SimNetworkInterface> = model.net.mapIndexed { i, adapter -> NetworkAdapterImpl(adapter, i) }
+ public val net: List<SimNetworkInterface> = model.net.mapIndexed { i, adapter -> NetworkAdapterImpl(adapter, i) }
/**
* The network interfaces available to the machine.
*/
- protected val storage: List<SimStorageInterface> = model.storage.mapIndexed { i, device -> StorageDeviceImpl(interpreter, device, i) }
+ public val storage: List<SimStorageInterface> = model.storage.mapIndexed { i, device -> StorageDeviceImpl(interpreter, device, i) }
/**
* The peripherals of the machine.
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
index af28c346..3b49d515 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
@@ -64,7 +64,7 @@ public interface SimHypervisor : SimWorkload {
*/
public fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Double,
+ totalWork: Double,
grantedWork: Double,
overcommittedWork: Double,
interferedWork: Double,
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
index 8dea0045..1f010338 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
@@ -70,14 +70,14 @@ internal class SimHypervisorTest {
override fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Double,
+ totalWork: Double,
grantedWork: Double,
overcommittedWork: Double,
interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double
) {
- totalRequestedWork += requestedWork
+ totalRequestedWork += totalWork
totalGrantedWork += grantedWork
totalOvercommittedWork += overcommittedWork
}
@@ -128,14 +128,14 @@ internal class SimHypervisorTest {
override fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Double,
+ totalWork: Double,
grantedWork: Double,
overcommittedWork: Double,
interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double
) {
- totalRequestedWork += requestedWork
+ totalRequestedWork += totalWork
totalGrantedWork += grantedWork
totalOvercommittedWork += overcommittedWork
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
new file mode 100644
index 00000000..e9449634
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
@@ -0,0 +1,448 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute
+
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.data.PointData
+import io.opentelemetry.sdk.resources.Resource
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import org.opendc.telemetry.compute.table.*
+import java.time.Instant
+import kotlin.math.roundToLong
+
+/**
+ * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData].
+ */
+public class ComputeMetricAggregator {
+ private val _service = ServiceAggregator()
+ private val _hosts = mutableMapOf<String, HostAggregator>()
+ private val _servers = mutableMapOf<String, ServerAggregator>()
+
+ /**
+ * Process the specified [metrics] for this cycle.
+ */
+ public fun process(metrics: Collection<MetricData>) {
+ val service = _service
+ val hosts = _hosts
+ val servers = _servers
+
+ for (metric in metrics) {
+ val resource = metric.resource
+
+ when (metric.name) {
+ // ComputeService
+ "scheduler.hosts" -> {
+ for (point in metric.longSumData.points) {
+ when (point.attributes[STATE_KEY]) {
+ "up" -> service.hostsUp = point.value.toInt()
+ "down" -> service.hostsDown = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.servers" -> {
+ for (point in metric.longSumData.points) {
+ when (point.attributes[STATE_KEY]) {
+ "pending" -> service.serversPending = point.value.toInt()
+ "active" -> service.serversActive = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.attempts" -> {
+ for (point in metric.longSumData.points) {
+ when (point.attributes[RESULT_KEY]) {
+ "success" -> service.attemptsSuccess = point.value.toInt()
+ "failure" -> service.attemptsFailure = point.value.toInt()
+ "error" -> service.attemptsError = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.latency" -> {
+ for (point in metric.doubleHistogramData.points) {
+ val server = getServer(servers, point) ?: continue
+ server.schedulingLatency = (point.sum / point.count).roundToLong()
+ }
+ }
+
+ // SimHost
+ "system.guests" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ when (point.attributes[STATE_KEY]) {
+ "terminated" -> agg.guestsTerminated = point.value.toInt()
+ "running" -> agg.guestsRunning = point.value.toInt()
+ "error" -> agg.guestsRunning = point.value.toInt()
+ "invalid" -> agg.guestsInvalid = point.value.toInt()
+ }
+ }
+ }
+ "system.cpu.limit" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.doubleGaugeData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ server.cpuLimit = point.value
+ server.host = agg.host
+ } else {
+ agg.cpuLimit = point.value
+ }
+ }
+ }
+ "system.cpu.usage" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuUsage = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.demand" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuDemand = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.utilization" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuUtilization = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.time" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ val server = getServer(servers, point)
+ val state = point.attributes[STATE_KEY]
+ if (server != null) {
+ when (state) {
+ "active" -> server.cpuActiveTime = point.value
+ "idle" -> server.cpuIdleTime = point.value
+ "steal" -> server.cpuStealTime = point.value
+ "lost" -> server.cpuLostTime = point.value
+ }
+ server.host = agg.host
+ } else {
+ when (state) {
+ "active" -> agg.cpuActiveTime = point.value
+ "idle" -> agg.cpuIdleTime = point.value
+ "steal" -> agg.cpuStealTime = point.value
+ "lost" -> agg.cpuLostTime = point.value
+ }
+ }
+ }
+ }
+ "system.power.usage" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.powerUsage = metric.doubleGaugeData.points.first().value
+ }
+ "system.power.total" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.powerTotal = metric.doubleSumData.points.first().value
+ }
+ "system.time" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ when (point.attributes[STATE_KEY]) {
+ "up" -> server.uptime = point.value
+ "down" -> server.downtime = point.value
+ }
+ server.host = agg.host
+ } else {
+ when (point.attributes[STATE_KEY]) {
+ "up" -> agg.uptime = point.value
+ "down" -> agg.downtime = point.value
+ }
+ }
+ }
+ }
+ "system.time.boot" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longGaugeData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ server.bootTime = point.value
+ server.host = agg.host
+ } else {
+ agg.bootTime = point.value
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Collect the data via the [monitor].
+ */
+ public fun collect(now: Instant, monitor: ComputeMonitor) {
+ monitor.record(_service.collect(now))
+
+ for (host in _hosts.values) {
+ monitor.record(host.collect(now))
+ }
+
+ for (server in _servers.values) {
+ monitor.record(server.collect(now))
+ }
+ }
+
+ /**
+ * Obtain the [HostAggregator] for the specified [resource].
+ */
+ private fun getHost(hosts: MutableMap<String, HostAggregator>, resource: Resource): HostAggregator? {
+ val id = resource.attributes[HOST_ID]
+ return if (id != null) {
+ hosts.computeIfAbsent(id) { HostAggregator(resource) }
+ } else {
+ null
+ }
+ }
+
+ /**
+ * Obtain the [ServerAggregator] for the specified [point].
+ */
+ private fun getServer(servers: MutableMap<String, ServerAggregator>, point: PointData): ServerAggregator? {
+ val id = point.attributes[ResourceAttributes.HOST_ID]
+ return if (id != null) {
+ servers.computeIfAbsent(id) { ServerAggregator(point.attributes) }
+ } else {
+ null
+ }
+ }
+
+ /**
+ * An aggregator for service metrics before they are reported.
+ */
+ internal class ServiceAggregator {
+ @JvmField var hostsUp = 0
+ @JvmField var hostsDown = 0
+
+ @JvmField var serversPending = 0
+ @JvmField var serversActive = 0
+
+ @JvmField var attemptsSuccess = 0
+ @JvmField var attemptsFailure = 0
+ @JvmField var attemptsError = 0
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(now: Instant): ServiceData = toServiceData(now)
+
+ /**
+ * Convert the aggregator state to an immutable [ServiceData].
+ */
+ private fun toServiceData(now: Instant): ServiceData {
+ return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
+ }
+ }
+
+ /**
+ * An aggregator for host metrics before they are reported.
+ */
+ internal class HostAggregator(resource: Resource) {
+ /**
+ * The static information about this host.
+ */
+ val host = HostInfo(
+ resource.attributes[HOST_ID]!!,
+ resource.attributes[HOST_NAME] ?: "",
+ resource.attributes[HOST_ARCH] ?: "",
+ resource.attributes[HOST_NCPUS]?.toInt() ?: 0,
+ resource.attributes[HOST_MEM_CAPACITY] ?: 0,
+ )
+
+ @JvmField var guestsTerminated = 0
+ @JvmField var guestsRunning = 0
+ @JvmField var guestsError = 0
+ @JvmField var guestsInvalid = 0
+
+ @JvmField var cpuLimit = 0.0
+ @JvmField var cpuUsage = 0.0
+ @JvmField var cpuDemand = 0.0
+ @JvmField var cpuUtilization = 0.0
+
+ @JvmField var cpuActiveTime = 0L
+ @JvmField var cpuIdleTime = 0L
+ @JvmField var cpuStealTime = 0L
+ @JvmField var cpuLostTime = 0L
+ private var previousCpuActiveTime = 0L
+ private var previousCpuIdleTime = 0L
+ private var previousCpuStealTime = 0L
+ private var previousCpuLostTime = 0L
+
+ @JvmField var powerUsage = 0.0
+ @JvmField var powerTotal = 0.0
+ private var previousPowerTotal = 0.0
+
+ @JvmField var uptime = 0L
+ private var previousUptime = 0L
+ @JvmField var downtime = 0L
+ private var previousDowntime = 0L
+ @JvmField var bootTime = Long.MIN_VALUE
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(now: Instant): HostData {
+ val data = toHostData(now)
+
+ // Reset intermediate state for next aggregation
+ previousCpuActiveTime = cpuActiveTime
+ previousCpuIdleTime = cpuIdleTime
+ previousCpuStealTime = cpuStealTime
+ previousCpuLostTime = cpuLostTime
+ previousPowerTotal = powerTotal
+ previousUptime = uptime
+ previousDowntime = downtime
+
+ guestsTerminated = 0
+ guestsRunning = 0
+ guestsError = 0
+ guestsInvalid = 0
+
+ cpuLimit = 0.0
+ cpuUsage = 0.0
+ cpuDemand = 0.0
+ cpuUtilization = 0.0
+
+ powerUsage = 0.0
+
+ return data
+ }
+
+ /**
+ * Convert the aggregator state to an immutable [HostData] instance.
+ */
+ private fun toHostData(now: Instant): HostData {
+ return HostData(
+ now,
+ host,
+ guestsTerminated,
+ guestsRunning,
+ guestsError,
+ guestsInvalid,
+ cpuLimit,
+ cpuUsage,
+ cpuDemand,
+ cpuUtilization,
+ cpuActiveTime - previousCpuActiveTime,
+ cpuIdleTime - previousCpuIdleTime,
+ cpuStealTime - previousCpuStealTime,
+ cpuLostTime - previousCpuLostTime,
+ powerUsage,
+ powerTotal - previousPowerTotal,
+ uptime - previousUptime,
+ downtime - previousDowntime,
+ if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null
+ )
+ }
+ }
+
+ /**
+ * An aggregator for server metrics before they are reported.
+ */
+ internal class ServerAggregator(attributes: Attributes) {
+ /**
+ * The static information about this server.
+ */
+ val server = ServerInfo(
+ attributes[ResourceAttributes.HOST_ID]!!,
+ attributes[ResourceAttributes.HOST_NAME]!!,
+ attributes[ResourceAttributes.HOST_TYPE]!!,
+ attributes[ResourceAttributes.HOST_ARCH]!!,
+ attributes[ResourceAttributes.HOST_IMAGE_ID]!!,
+ attributes[ResourceAttributes.HOST_IMAGE_NAME]!!,
+ attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(),
+ attributes[AttributeKey.longKey("host.mem_capacity")]!!,
+ )
+
+ /**
+ * The [HostInfo] of the host on which the server is hosted.
+ */
+ var host: HostInfo? = null
+
+ @JvmField var uptime: Long = 0
+ private var previousUptime = 0L
+ @JvmField var downtime: Long = 0
+ private var previousDowntime = 0L
+ @JvmField var bootTime: Long = 0
+ @JvmField var schedulingLatency = 0L
+ @JvmField var cpuLimit = 0.0
+ @JvmField var cpuActiveTime = 0L
+ @JvmField var cpuIdleTime = 0L
+ @JvmField var cpuStealTime = 0L
+ @JvmField var cpuLostTime = 0L
+ private var previousCpuActiveTime = 0L
+ private var previousCpuIdleTime = 0L
+ private var previousCpuStealTime = 0L
+ private var previousCpuLostTime = 0L
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(now: Instant): ServerData {
+ val data = toServerData(now)
+
+ previousUptime = uptime
+ previousDowntime = downtime
+ previousCpuActiveTime = cpuActiveTime
+ previousCpuIdleTime = cpuIdleTime
+ previousCpuStealTime = cpuStealTime
+ previousCpuLostTime = cpuLostTime
+
+ host = null
+ cpuLimit = 0.0
+
+ return data
+ }
+
+ /**
+ * Convert the aggregator state into an immutable [ServerData].
+ */
+ private fun toServerData(now: Instant): ServerData {
+ return ServerData(
+ now,
+ server,
+ host,
+ uptime - previousUptime,
+ downtime - previousDowntime,
+ if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null,
+ schedulingLatency,
+ cpuLimit,
+ cpuActiveTime - previousCpuActiveTime,
+ cpuIdleTime - previousCpuIdleTime,
+ cpuStealTime - previousCpuStealTime,
+ cpuLostTime - previousCpuLostTime
+ )
+ }
+ }
+
+ private companion object {
+ private val STATE_KEY = AttributeKey.stringKey("state")
+ private val RESULT_KEY = AttributeKey.stringKey("result")
+ }
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
index 408d1325..ea96f721 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
@@ -22,28 +22,24 @@
package org.opendc.telemetry.compute
-import io.opentelemetry.api.common.AttributeKey
-import io.opentelemetry.api.common.Attributes
import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.data.*
import io.opentelemetry.sdk.metrics.export.MetricExporter
-import io.opentelemetry.sdk.resources.Resource
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.HostInfo
-import org.opendc.telemetry.compute.table.ServerData
-import org.opendc.telemetry.compute.table.ServerInfo
import java.time.Clock
/**
* A [MetricExporter] that redirects data to a [ComputeMonitor] implementation.
*/
public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter {
+ /**
+ * A [ComputeMetricAggregator] that actually performs the aggregation.
+ */
+ private val agg = ComputeMetricAggregator()
+
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
return try {
- reportServiceMetrics(metrics)
- reportHostMetrics(metrics)
- reportServerMetrics(metrics)
+ agg.process(metrics)
+ agg.collect(clock.instant(), monitor)
CompletableResultCode.ofSuccess()
} catch (e: Throwable) {
CompletableResultCode.ofFailure()
@@ -53,229 +49,4 @@ public class ComputeMetricExporter(private val clock: Clock, private val monitor
override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- private fun reportServiceMetrics(metrics: Collection<MetricData>) {
- monitor.record(extractServiceMetrics(clock.millis(), metrics))
- }
-
- private val hosts = mutableMapOf<String, HostAggregator>()
- private val servers = mutableMapOf<String, ServerAggregator>()
-
- private fun reportHostMetrics(metrics: Collection<MetricData>) {
- val hosts = hosts
- val servers = servers
-
- for (metric in metrics) {
- val resource = metric.resource
- val hostId = resource.attributes[HOST_ID] ?: continue
- val agg = hosts.computeIfAbsent(hostId) { HostAggregator(resource) }
- agg.accept(metric)
- }
-
- val monitor = monitor
- val now = clock.millis()
- for ((_, server) in servers) {
- server.record(monitor, now)
- }
- }
-
- private fun reportServerMetrics(metrics: Collection<MetricData>) {
- val hosts = hosts
-
- for (metric in metrics) {
- val resource = metric.resource
- val host = resource.attributes[HOST_ID]?.let { hosts[it]?.host }
-
- when (metric.name) {
- "scheduler.duration" -> mapByServer(metric.doubleHistogramData.points, host) { agg, point ->
- agg.schedulingLatency = point.sum / point.count
- }
- "guest.time.running" -> mapByServer(metric.longSumData.points, host) { agg, point ->
- agg.uptime = point.value
- }
- "guest.time.error" -> mapByServer(metric.longSumData.points, host) { agg, point ->
- agg.downtime = point.value
- }
- }
- }
-
- val monitor = monitor
- val now = clock.millis()
- for ((_, host) in hosts) {
- host.record(monitor, now)
- }
- }
-
- /**
- * Helper function to map a metric by the server.
- */
- private inline fun <P : PointData> mapByServer(points: Collection<P>, host: HostInfo? = null, block: (ServerAggregator, P) -> Unit) {
- for (point in points) {
- val serverId = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val agg = servers.computeIfAbsent(serverId) { ServerAggregator(point.attributes) }
-
- if (host != null) {
- agg.host = host
- }
-
- block(agg, point)
- }
- }
-
- /**
- * An aggregator for host metrics before they are reported.
- */
- private class HostAggregator(resource: Resource) {
- /**
- * The static information about this host.
- */
- val host = HostInfo(
- resource.attributes[HOST_ID]!!,
- resource.attributes[HOST_NAME]!!,
- resource.attributes[HOST_ARCH]!!,
- resource.attributes[HOST_NCPUS]!!.toInt(),
- resource.attributes[HOST_MEM_CAPACITY]!!,
- )
-
- private var totalWork: Double = 0.0
- private var previousTotalWork = 0.0
- private var grantedWork: Double = 0.0
- private var previousGrantedWork = 0.0
- private var overcommittedWork: Double = 0.0
- private var previousOvercommittedWork = 0.0
- private var interferedWork: Double = 0.0
- private var previousInterferedWork = 0.0
- private var cpuUsage: Double = 0.0
- private var cpuDemand: Double = 0.0
- private var instanceCount: Int = 0
- private var powerDraw: Double = 0.0
- private var uptime: Long = 0
- private var previousUptime = 0L
- private var downtime: Long = 0
- private var previousDowntime = 0L
-
- fun record(monitor: ComputeMonitor, now: Long) {
- monitor.record(
- HostData(
- now,
- host,
- totalWork - previousTotalWork,
- grantedWork - previousGrantedWork,
- overcommittedWork - previousOvercommittedWork,
- interferedWork - previousInterferedWork,
- cpuUsage,
- cpuDemand,
- instanceCount,
- powerDraw,
- uptime - previousUptime,
- downtime - previousDowntime,
- )
- )
-
- previousTotalWork = totalWork
- previousGrantedWork = grantedWork
- previousOvercommittedWork = overcommittedWork
- previousInterferedWork = interferedWork
- previousUptime = uptime
- previousDowntime = downtime
- reset()
- }
-
- /**
- * Accept the [MetricData] for this host.
- */
- fun accept(data: MetricData) {
- when (data.name) {
- "cpu.work.total" -> totalWork = data.doubleSumData.points.first().value
- "cpu.work.granted" -> grantedWork = data.doubleSumData.points.first().value
- "cpu.work.overcommit" -> overcommittedWork = data.doubleSumData.points.first().value
- "cpu.work.interference" -> interferedWork = data.doubleSumData.points.first().value
- "power.usage" -> powerDraw = acceptHistogram(data)
- "cpu.usage" -> cpuUsage = acceptHistogram(data)
- "cpu.demand" -> cpuDemand = acceptHistogram(data)
- "guests.active" -> instanceCount = data.longSumData.points.first().value.toInt()
- "host.time.up" -> uptime = data.longSumData.points.first().value
- "host.time.down" -> downtime = data.longSumData.points.first().value
- }
- }
-
- private fun acceptHistogram(data: MetricData): Double {
- return when (data.type) {
- MetricDataType.HISTOGRAM -> {
- val point = data.doubleHistogramData.points.first()
- point.sum / point.count
- }
- MetricDataType.SUMMARY -> {
- val point = data.doubleSummaryData.points.first()
- point.sum / point.count
- }
- else -> error("Invalid metric type")
- }
- }
-
- private fun reset() {
- totalWork = 0.0
- grantedWork = 0.0
- overcommittedWork = 0.0
- interferedWork = 0.0
- cpuUsage = 0.0
- cpuDemand = 0.0
- instanceCount = 0
- powerDraw = 0.0
- uptime = 0L
- downtime = 0L
- }
- }
-
- /**
- * An aggregator for server metrics before they are reported.
- */
- private class ServerAggregator(attributes: Attributes) {
- /**
- * The static information about this server.
- */
- val server = ServerInfo(
- attributes[ResourceAttributes.HOST_ID]!!,
- attributes[ResourceAttributes.HOST_NAME]!!,
- attributes[ResourceAttributes.HOST_TYPE]!!,
- attributes[ResourceAttributes.HOST_ARCH]!!,
- attributes[ResourceAttributes.HOST_IMAGE_ID]!!,
- attributes[ResourceAttributes.HOST_IMAGE_NAME]!!,
- attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(),
- attributes[AttributeKey.longKey("host.mem_capacity")]!!,
- )
-
- /**
- * The [HostInfo] of the host on which the server is hosted.
- */
- var host: HostInfo? = null
-
- @JvmField var uptime: Long = 0
- private var previousUptime = 0L
- @JvmField var downtime: Long = 0
- private var previousDowntime = 0L
- @JvmField var schedulingLatency = 0.0
-
- fun record(monitor: ComputeMonitor, now: Long) {
- monitor.record(
- ServerData(
- now,
- server,
- null,
- uptime - previousUptime,
- downtime - previousDowntime,
- )
- )
-
- previousUptime = uptime
- previousDowntime = downtime
- reset()
- }
-
- private fun reset() {
- host = null
- uptime = 0L
- downtime = 0L
- }
- }
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
index f3690ee8..25d346fb 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -22,64 +22,31 @@
package org.opendc.telemetry.compute
-import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.opendc.telemetry.compute.table.ServiceData
+import java.time.Instant
/**
* Collect the metrics of the compute service.
*/
-public fun collectServiceMetrics(timestamp: Long, metricProducer: MetricProducer): ServiceData {
+public fun collectServiceMetrics(timestamp: Instant, metricProducer: MetricProducer): ServiceData {
return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics())
}
/**
* Extract a [ServiceData] object from the specified list of metric data.
*/
-public fun extractServiceMetrics(timestamp: Long, metrics: Collection<MetricData>): ServiceData {
- val resultKey = AttributeKey.stringKey("result")
- val stateKey = AttributeKey.stringKey("state")
-
- var hostsUp = 0
- var hostsDown = 0
-
- var serversPending = 0
- var serversActive = 0
-
- var attemptsSuccess = 0
- var attemptsFailure = 0
- var attemptsError = 0
-
- for (metric in metrics) {
- when (metric.name) {
- "scheduler.hosts" -> {
- for (point in metric.longSumData.points) {
- when (point.attributes[stateKey]) {
- "up" -> hostsUp = point.value.toInt()
- "down" -> hostsDown = point.value.toInt()
- }
- }
- }
- "scheduler.servers" -> {
- for (point in metric.longSumData.points) {
- when (point.attributes[stateKey]) {
- "pending" -> serversPending = point.value.toInt()
- "active" -> serversActive = point.value.toInt()
- }
- }
- }
- "scheduler.attempts" -> {
- for (point in metric.longSumData.points) {
- when (point.attributes[resultKey]) {
- "success" -> attemptsSuccess = point.value.toInt()
- "failure" -> attemptsFailure = point.value.toInt()
- "error" -> attemptsError = point.value.toInt()
- }
- }
- }
+public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricData>): ServiceData {
+ lateinit var serviceData: ServiceData
+ val agg = ComputeMetricAggregator()
+ val monitor = object : ComputeMonitor {
+ override fun record(data: ServiceData) {
+ serviceData = data
}
}
- return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
+ agg.process(metrics)
+ agg.collect(timestamp, monitor)
+ return serviceData
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
index e3ecda3d..8e787b97 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
@@ -22,20 +22,29 @@
package org.opendc.telemetry.compute.table
+import java.time.Instant
+
/**
* A trace entry for a particular host.
*/
public data class HostData(
- public val timestamp: Long,
- public val host: HostInfo,
- public val totalWork: Double,
- public val grantedWork: Double,
- public val overcommittedWork: Double,
- public val interferedWork: Double,
- public val cpuUsage: Double,
- public val cpuDemand: Double,
- public val instanceCount: Int,
- public val powerDraw: Double,
- public val uptime: Long,
- public val downtime: Long,
+ val timestamp: Instant,
+ val host: HostInfo,
+ val guestsTerminated: Int,
+ val guestsRunning: Int,
+ val guestsError: Int,
+ val guestsInvalid: Int,
+ val cpuLimit: Double,
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val cpuUtilization: Double,
+ val cpuActiveTime: Long,
+ val cpuIdleTime: Long,
+ val cpuStealTime: Long,
+ val cpuLostTime: Long,
+ val powerUsage: Double,
+ val powerTotal: Double,
+ val uptime: Long,
+ val downtime: Long,
+ val bootTime: Instant?
)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
index 7fde86d9..c48bff3a 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
@@ -22,13 +22,22 @@
package org.opendc.telemetry.compute.table
+import java.time.Instant
+
/**
* A trace entry for a particular server.
*/
public data class ServerData(
- public val timestamp: Long,
- public val server: ServerInfo,
- public val host: HostInfo?,
- public val uptime: Long,
- public val downtime: Long,
+ val timestamp: Instant,
+ val server: ServerInfo,
+ val host: HostInfo?,
+ val uptime: Long,
+ val downtime: Long,
+ val bootTime: Instant?,
+ val schedulingLatency: Long,
+ val cpuLimit: Double,
+ val cpuActiveTime: Long,
+ val cpuIdleTime: Long,
+ val cpuStealTime: Long,
+ val cpuLostTime: Long,
)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
index da2ebdf4..6db1399d 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
@@ -22,16 +22,18 @@
package org.opendc.telemetry.compute.table
+import java.time.Instant
+
/**
* A trace entry for the compute service.
*/
public data class ServiceData(
- public val timestamp: Long,
- public val hostsUp: Int,
- public val hostsDown: Int,
- public val serversPending: Int,
- public val serversActive: Int,
- public val attemptsSuccess: Int,
- public val attemptsFailure: Int,
- public val attemptsError: Int
+ val timestamp: Instant,
+ val hostsUp: Int,
+ val hostsDown: Int,
+ val serversPending: Int,
+ val serversActive: Int,
+ val attemptsSuccess: Int,
+ val attemptsFailure: Int,
+ val attemptsError: Int
)
diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
index 8f19ab81..07f0ff7f 100644
--- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
+++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
@@ -44,7 +44,7 @@ public class CoroutineMetricReader(
scope: CoroutineScope,
private val producers: List<MetricProducer>,
private val exporter: MetricExporter,
- private val exportInterval: Duration = Duration.ofMinutes(1)
+ private val exportInterval: Duration = Duration.ofMinutes(5)
) : AutoCloseable {
private val logger = KotlinLogging.logger {}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 960d5ebd..483558e1 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -26,8 +26,6 @@ import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.long
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.experiments.capelin.*
@@ -49,7 +47,6 @@ import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
-import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.web.client.ApiClient
import org.opendc.web.client.AuthConfiguration
import org.opendc.web.client.model.Scenario
@@ -187,11 +184,6 @@ class RunnerCli : CliktCommand(name = "runner") {
val seeder = Random(repeat.toLong())
- val meterProvider: MeterProvider = SdkMeterProvider
- .builder()
- .setClock(clock.toOtelClock())
- .build()
-
val operational = scenario.operationalPhenomena
val computeScheduler = createComputeScheduler(operational.schedulerName, seeder)
@@ -215,7 +207,7 @@ class RunnerCli : CliktCommand(name = "runner") {
interferenceModel.takeIf { operational.performanceInterferenceEnabled }
)
- val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor), exportInterval = Duration.ofHours(1))
try {
simulator.run(trace)
@@ -224,7 +216,7 @@ class RunnerCli : CliktCommand(name = "runner") {
metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
logger.debug {
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
index e0e3488f..a0c281e8 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
@@ -65,10 +65,10 @@ public class ScenarioManager(private val client: ApiClient) {
client.updateJob(
id, SimulationState.FINISHED,
mapOf(
- "total_requested_burst" to results.map { it.totalWork },
- "total_granted_burst" to results.map { it.totalGrantedWork },
- "total_overcommitted_burst" to results.map { it.totalOvercommittedWork },
- "total_interfered_burst" to results.map { it.totalInterferedWork },
+ "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime },
+ "total_granted_burst" to results.map { it.totalActiveTime },
+ "total_overcommitted_burst" to results.map { it.totalStealTime },
+ "total_interfered_burst" to results.map { it.totalLostTime },
"mean_cpu_usage" to results.map { it.meanCpuUsage },
"mean_cpu_demand" to results.map { it.meanCpuDemand },
"mean_num_deployed_images" to results.map { it.meanNumDeployedImages },
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
index 5f2c474b..bb412738 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
@@ -33,24 +33,23 @@ import kotlin.math.roundToLong
*/
class WebComputeMonitor : ComputeMonitor {
override fun record(data: HostData) {
- val duration = data.uptime
val slices = data.downtime / SLICE_LENGTH
hostAggregateMetrics = AggregateHostMetrics(
- hostAggregateMetrics.totalWork + data.totalWork,
- hostAggregateMetrics.totalGrantedWork + data.grantedWork,
- hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork,
- hostAggregateMetrics.totalInterferedWork + data.overcommittedWork,
- hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600,
+ hostAggregateMetrics.totalActiveTime + data.cpuActiveTime,
+ hostAggregateMetrics.totalIdleTime + data.cpuIdleTime,
+ hostAggregateMetrics.totalStealTime + data.cpuStealTime,
+ hostAggregateMetrics.totalLostTime + data.cpuLostTime,
+ hostAggregateMetrics.totalPowerDraw + data.powerTotal,
hostAggregateMetrics.totalFailureSlices + slices,
- hostAggregateMetrics.totalFailureVmSlices + data.instanceCount * slices
+ hostAggregateMetrics.totalFailureVmSlices + data.guestsRunning * slices
)
hostMetrics.compute(data.host.id) { _, prev ->
HostMetrics(
data.cpuUsage + (prev?.cpuUsage ?: 0.0),
data.cpuDemand + (prev?.cpuDemand ?: 0.0),
- data.instanceCount + (prev?.instanceCount ?: 0),
+ data.guestsRunning + (prev?.instanceCount ?: 0),
1 + (prev?.count ?: 0)
)
}
@@ -58,13 +57,13 @@ class WebComputeMonitor : ComputeMonitor {
private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
private val hostMetrics: MutableMap<String, HostMetrics> = mutableMapOf()
- private val SLICE_LENGTH: Long = 5 * 60 * 1000
+ private val SLICE_LENGTH: Long = 5 * 60
data class AggregateHostMetrics(
- val totalWork: Double = 0.0,
- val totalGrantedWork: Double = 0.0,
- val totalOvercommittedWork: Double = 0.0,
- val totalInterferedWork: Double = 0.0,
+ val totalActiveTime: Long = 0L,
+ val totalIdleTime: Long = 0L,
+ val totalStealTime: Long = 0L,
+ val totalLostTime: Long = 0L,
val totalPowerDraw: Double = 0.0,
val totalFailureSlices: Double = 0.0,
val totalFailureVmSlices: Double = 0.0,
@@ -99,10 +98,10 @@ class WebComputeMonitor : ComputeMonitor {
fun getResult(): Result {
return Result(
- hostAggregateMetrics.totalWork,
- hostAggregateMetrics.totalGrantedWork,
- hostAggregateMetrics.totalOvercommittedWork,
- hostAggregateMetrics.totalInterferedWork,
+ hostAggregateMetrics.totalActiveTime,
+ hostAggregateMetrics.totalIdleTime,
+ hostAggregateMetrics.totalStealTime,
+ hostAggregateMetrics.totalLostTime,
hostMetrics.map { it.value.cpuUsage / it.value.count }.average(),
hostMetrics.map { it.value.cpuDemand / it.value.count }.average(),
hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(),
@@ -118,10 +117,10 @@ class WebComputeMonitor : ComputeMonitor {
}
data class Result(
- val totalWork: Double,
- val totalGrantedWork: Double,
- val totalOvercommittedWork: Double,
- val totalInterferedWork: Double,
+ val totalActiveTime: Long,
+ val totalIdleTime: Long,
+ val totalStealTime: Long,
+ val totalLostTime: Long,
val meanCpuUsage: Double,
val meanCpuDemand: Double,
val meanNumDeployedImages: Double,