summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-07 16:04:46 +0200
committerGitHub <noreply@github.com>2021-09-07 16:04:46 +0200
commit3eda751b725448139217dc1929dca1fc354e2a4e (patch)
tree11d933753c515140a6ae846fe96448ad64b165aa
parenteb4de7f832c6d26725e0d7c29644c704ea82604e (diff)
parent18ff316a6b6ab984ebf8283ea48ed98ec69d8295 (diff)
merge: Prepare for risk analysis experiments
This pull request adds the necessary code in preparation for the risk analysis experiments: - Track provisioning time - Track host up/down time - Track guest up/down time - Support overcommitted memory - Do not fail inactive guests - Mark unschedulable server as terminated - Make ExperimentMonitor optional for trace processing - Report up/downtime metrics in experiment monitor - Move metric collection outside Capelin code - Resolve kotlin-reflect incompatibility - Restructure input reading classes **Breaking API Changes** - `ExperimentMonitor` replaced in favour of `ComputeMonitor`
-rw-r--r--opendc-compute/opendc-compute-service/build.gradle.kts1
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt19
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt6
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt6
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt134
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt20
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts6
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt98
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt24
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt)25
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt)46
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt73
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt)44
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt118
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt81
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt72
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt21
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt19
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt6
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt94
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt4
-rw-r--r--opendc-experiments/opendc-experiments-energy21/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt18
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/build.gradle.kts (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt)26
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt)114
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt)50
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt111
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt)26
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt)19
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt)28
-rw-r--r--opendc-web/opendc-web-runner/build.gradle.kts2
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt23
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt10
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt145
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt189
-rw-r--r--settings.gradle.kts1
38 files changed, 809 insertions, 879 deletions
diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts
index e0e48b0f..33cafc45 100644
--- a/opendc-compute/opendc-compute-service/build.gradle.kts
+++ b/opendc-compute/opendc-compute-service/build.gradle.kts
@@ -35,6 +35,7 @@ dependencies {
api(projects.opendcTelemetry.opendcTelemetryApi)
implementation(projects.opendcUtils)
implementation(libs.kotlin.logging)
+ implementation(libs.opentelemetry.semconv)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testRuntimeOnly(libs.log4j.slf4j)
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index d7a7e8f8..f1c055d4 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,7 +22,9 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.*
@@ -160,6 +162,15 @@ internal class ComputeServiceImpl(
.build()
/**
+ * The response time of the service.
+ */
+ private val _schedulerDuration = meter.histogramBuilder("scheduler.duration")
+ .setDescription("End to end latency for a server to be scheduled (in multiple attempts)")
+ .ofLongs()
+ .setUnit("ms")
+ .build()
+
+ /**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
*/
private var timerScheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock)
@@ -325,7 +336,7 @@ internal class ComputeServiceImpl(
internal fun schedule(server: InternalServer): SchedulingRequest {
logger.debug { "Enqueueing server ${server.uid} to be assigned to host." }
- val request = SchedulingRequest(server)
+ val request = SchedulingRequest(server, clock.millis())
queue.add(request)
_submittedServers.add(1)
_waitingServers.add(1)
@@ -368,6 +379,7 @@ internal class ComputeServiceImpl(
* Run a single scheduling iteration.
*/
private fun doSchedule() {
+ val now = clock.millis()
while (queue.isNotEmpty()) {
val request = queue.peek()
@@ -390,7 +402,7 @@ internal class ComputeServiceImpl(
logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
- server.state = ServerState.ERROR
+ server.state = ServerState.TERMINATED
continue
} else {
break
@@ -402,6 +414,7 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
_waitingServers.add(-1)
+ _schedulerDuration.record(now - request.submitTime, Attributes.of(ResourceAttributes.HOST_ID, server.uid.toString()))
logger.info { "Assigned server $server to host $host." }
@@ -430,7 +443,7 @@ internal class ComputeServiceImpl(
/**
* A request to schedule an [InternalServer] onto one of the [Host]s.
*/
- internal data class SchedulingRequest(val server: InternalServer) {
+ internal data class SchedulingRequest(val server: InternalServer, val submitTime: Long) {
/**
* A flag to indicate that the request is cancelled.
*/
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index c6c01ea2..d036ec00 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -170,7 +170,7 @@ internal class ComputeServiceTest {
server.start()
delay(5L * 60 * 1000)
server.refresh()
- assertEquals(ServerState.ERROR, server.state)
+ assertEquals(ServerState.TERMINATED, server.state)
}
@Test
@@ -183,7 +183,7 @@ internal class ComputeServiceTest {
server.start()
delay(5L * 60 * 1000)
server.refresh()
- assertEquals(ServerState.ERROR, server.state)
+ assertEquals(ServerState.TERMINATED, server.state)
}
@Test
@@ -196,7 +196,7 @@ internal class ComputeServiceTest {
server.start()
delay(5L * 60 * 1000)
server.refresh()
- assertEquals(ServerState.ERROR, server.state)
+ assertEquals(ServerState.TERMINATED, server.state)
}
@Test
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
index 20ea8d20..28fd8217 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
@@ -102,7 +102,7 @@ class InternalServerTest {
val image = mockk<InternalImage>()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
- every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer) }
+ every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) }
server.start()
@@ -160,7 +160,7 @@ class InternalServerTest {
val flavor = mockk<InternalFlavor>()
val image = mockk<InternalImage>()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
- val request = ComputeServiceImpl.SchedulingRequest(server)
+ val request = ComputeServiceImpl.SchedulingRequest(server, 0)
every { service.schedule(any()) } returns request
@@ -223,7 +223,7 @@ class InternalServerTest {
val flavor = mockk<InternalFlavor>()
val image = mockk<InternalImage>()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
- val request = ComputeServiceImpl.SchedulingRequest(server)
+ val request = ComputeServiceImpl.SchedulingRequest(server, 0)
every { service.schedule(any()) } returns request
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 20e5a9db..213d20ee 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -22,6 +22,7 @@
package org.opendc.compute.simulator
+import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
@@ -59,7 +60,7 @@ public class SimHost(
override val meta: Map<String, Any>,
context: CoroutineContext,
interpreter: SimResourceInterpreter,
- meter: Meter,
+ private val meter: Meter,
hypervisor: SimHypervisorProvider,
scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
@@ -72,6 +73,11 @@ public class SimHost(
override val scope: CoroutineScope = CoroutineScope(context + Job())
/**
+ * The clock instance used by the host.
+ */
+ private val clock = interpreter.clock
+
+ /**
* The logger instance of this server.
*/
private val logger = KotlinLogging.logger {}
@@ -82,11 +88,6 @@ public class SimHost(
private val listeners = mutableListOf<HostListener>()
/**
- * Current total memory use of the images on this hypervisor.
- */
- private var availableMemory: Long = model.memory.sumOf { it.size }
-
- /**
* The machine to run on.
*/
public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model, powerDriver)
@@ -115,6 +116,8 @@ public class SimHost(
_cpuDemand.record(cpuDemand)
_cpuUsage.record(cpuUsage)
_powerUsage.record(machine.powerDraw)
+
+ reportTime()
}
}
)
@@ -221,6 +224,33 @@ public class SimHost(
.build()
.bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
+ /**
+ * The amount of time in the system.
+ */
+ private val _totalTime = meter.counterBuilder("host.time.total")
+ .setDescription("The amount of time in the system")
+ .setUnit("ms")
+ .build()
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
+
+ /**
+ * The uptime of the host.
+ */
+ private val _upTime = meter.counterBuilder("host.time.up")
+ .setDescription("The uptime of the host")
+ .setUnit("ms")
+ .build()
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
+
+ /**
+ * The downtime of the host.
+ */
+ private val _downTime = meter.counterBuilder("host.time.down")
+ .setDescription("The downtime of the host")
+ .setUnit("ms")
+ .build()
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
+
init {
// Launch hypervisor onto machine
scope.launch {
@@ -238,25 +268,44 @@ public class SimHost(
}
}
+ private var _lastReport = clock.millis()
+
+ private fun reportTime() {
+ if (!scope.isActive)
+ return
+
+ val now = clock.millis()
+ val duration = now - _lastReport
+
+ _totalTime.add(duration)
+ when (_state) {
+ HostState.UP -> _upTime.add(duration)
+ HostState.DOWN -> _downTime.add(duration)
+ }
+
+ // Track time of guests
+ for (guest in guests.values) {
+ guest.reportTime()
+ }
+
+ _lastReport = now
+ }
+
override fun canFit(server: Server): Boolean {
- val sufficientMemory = availableMemory > server.flavor.memorySize
- val enoughCpus = machine.model.cpus.size >= server.flavor.cpuCount
+ val sufficientMemory = model.memorySize >= server.flavor.memorySize
+ val enoughCpus = model.cpuCount >= server.flavor.cpuCount
val canFit = hypervisor.canFit(server.flavor.toMachineModel())
return sufficientMemory && enoughCpus && canFit
}
override suspend fun spawn(server: Server, start: Boolean) {
- // Return if the server already exists on this host
- if (server in this) {
- return
+ val guest = guests.computeIfAbsent(server) { key ->
+ require(canFit(key)) { "Server does not fit" }
+ _guests.add(1)
+ Guest(key, hypervisor.createMachine(key.flavor.toMachineModel(), key.name))
}
- require(canFit(server)) { "Server does not fit" }
- val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel(), server.name))
- guests[server] = guest
- _guests.add(1)
-
if (start) {
guest.start()
}
@@ -291,6 +340,7 @@ public class SimHost(
}
override fun close() {
+ reportTime()
scope.cancel()
machine.close()
}
@@ -320,6 +370,7 @@ public class SimHost(
}
override suspend fun fail() {
+ reportTime()
_state = HostState.DOWN
for (guest in guests.values) {
guest.fail()
@@ -327,6 +378,7 @@ public class SimHost(
}
override suspend fun recover() {
+ reportTime()
_state = HostState.UP
for (guest in guests.values) {
guest.start()
@@ -339,6 +391,33 @@ public class SimHost(
private inner class Guest(val server: Server, val machine: SimMachine) {
var state: ServerState = ServerState.TERMINATED
+ /**
+ * The amount of time in the system.
+ */
+ private val _totalTime = meter.counterBuilder("guest.time.total")
+ .setDescription("The amount of time in the system")
+ .setUnit("ms")
+ .build()
+ .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
+
+ /**
+ * The uptime of the guest.
+ */
+ private val _runningTime = meter.counterBuilder("guest.time.running")
+ .setDescription("The uptime of the guest")
+ .setUnit("ms")
+ .build()
+ .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
+
+ /**
+ * The time the guest is in an error state.
+ */
+ private val _errorTime = meter.counterBuilder("guest.time.error")
+ .setDescription("The time the guest is in an error state")
+ .setUnit("ms")
+ .build()
+ .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
+
suspend fun start() {
when (state) {
ServerState.TERMINATED, ServerState.ERROR -> {
@@ -373,6 +452,9 @@ public class SimHost(
}
suspend fun fail() {
+ if (state != ServerState.RUNNING) {
+ return
+ }
stop()
state = ServerState.ERROR
}
@@ -414,8 +496,26 @@ public class SimHost(
else
ServerState.ERROR
- availableMemory += server.flavor.memorySize
onGuestStop(this)
}
+
+ private var _lastReport = clock.millis()
+
+ fun reportTime() {
+ if (state == ServerState.DELETED)
+ return
+
+ val now = clock.millis()
+ val duration = now - _lastReport
+
+ _totalTime.add(duration)
+ when (state) {
+ ServerState.RUNNING -> _runningTime.add(duration)
+ ServerState.ERROR -> _errorTime.add(duration)
+ else -> {}
+ }
+
+ _lastReport = now
+ }
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 1ba3a9a1..31215e9a 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -189,6 +189,10 @@ internal class SimHostTest {
fun testFailure() = runBlockingSimulation {
var requestedWork = 0L
var grantedWork = 0L
+ var totalTime = 0L
+ var downTime = 0L
+ var guestTotalTime = 0L
+ var guestDownTime = 0L
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
@@ -238,6 +242,18 @@ internal class SimHostTest {
metricsByName["cpu.work.granted"]?.let {
grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
}
+ metricsByName["host.time.total"]?.let {
+ totalTime = it.longSumData.points.first().value
+ }
+ metricsByName["host.time.down"]?.let {
+ downTime = it.longSumData.points.first().value
+ }
+ metricsByName["guest.time.total"]?.let {
+ guestTotalTime = it.longSumData.points.first().value
+ }
+ metricsByName["guest.time.error"]?.let {
+ guestDownTime = it.longSumData.points.first().value
+ }
return CompletableResultCode.ofSuccess()
}
@@ -275,6 +291,10 @@ internal class SimHostTest {
assertAll(
{ assertEquals(2226039, requestedWork, "Total time does not match") },
{ assertEquals(1086039, grantedWork, "Down time does not match") },
+ { assertEquals(1200001, totalTime, "Total time does not match") },
+ { assertEquals(1200001, guestTotalTime, "Guest total time does not match") },
+ { assertEquals(5000, downTime, "Down time does not match") },
+ { assertEquals(5000, guestDownTime, "Guest down time does not match") },
)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 65cebe1b..b2330af0 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -38,15 +38,15 @@ dependencies {
implementation(projects.opendcSimulator.opendcSimulatorFailures)
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(projects.opendcTelemetry.opendcTelemetryCompute)
implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
implementation(libs.config)
implementation(libs.progressbar)
implementation(libs.clikt)
- implementation(libs.jackson.module.kotlin) {
- exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect")
- }
+ implementation(libs.jackson.module.kotlin)
+ implementation(kotlin("reflect"))
implementation(libs.parquet)
testImplementation(libs.log4j.slf4j)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 46e11056..0230409e 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -24,16 +24,10 @@ package org.opendc.experiments.capelin
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostListener
-import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.ReplayScheduler
@@ -46,8 +40,6 @@ import org.opendc.compute.service.scheduler.weights.RamWeigher
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.EnvironmentReader
-import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.TraceReader
import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
@@ -57,7 +49,7 @@ import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.CorrelatedFaultInjector
import org.opendc.simulator.failures.FaultInjector
import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Clock
import kotlin.coroutines.resume
@@ -66,11 +58,6 @@ import kotlin.math.max
import kotlin.random.Random
/**
- * The logger for this experiment.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
* Construct the failure domain for the experiments.
*/
fun createFailureDomain(
@@ -169,85 +156,6 @@ suspend fun withComputeService(
}
/**
- * Attach the specified monitor to the VM provisioner.
- */
-suspend fun withMonitor(
- monitor: ExperimentMonitor,
- clock: Clock,
- metricProducer: MetricProducer,
- scheduler: ComputeService,
- block: suspend CoroutineScope.() -> Unit
-): Unit = coroutineScope {
- // Monitor host events
- for (host in scheduler.hosts) {
- monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
- host.addListener(object : HostListener {
- override fun onStateChanged(host: Host, newState: HostState) {
- monitor.reportHostStateChange(clock.millis(), host, newState)
- }
- })
- }
-
- val reader = CoroutineMetricReader(
- this,
- listOf(metricProducer),
- ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }),
- exportInterval = 5L * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */
- )
-
- try {
- block(this)
- } finally {
- reader.close()
- monitor.close()
- }
-}
-
-class ComputeMetrics {
- var submittedVms: Int = 0
- var queuedVms: Int = 0
- var runningVms: Int = 0
- var unscheduledVms: Int = 0
- var finishedVms: Int = 0
- var hosts: Int = 0
- var availableHosts = 0
-}
-
-/**
- * Collect the metrics of the compute service.
- */
-fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
- return extractComputeMetrics(metricProducer.collectAllMetrics())
-}
-
-/**
- * Extract an [ComputeMetrics] object from the specified list of metric data.
- */
-internal fun extractComputeMetrics(metrics: Collection<MetricData>): ComputeMetrics {
- val res = ComputeMetrics()
- for (metric in metrics) {
- val points = metric.longSumData.points
-
- if (points.isEmpty()) {
- continue
- }
-
- val value = points.first().value.toInt()
- when (metric.name) {
- "servers.submitted" -> res.submittedVms = value
- "servers.waiting" -> res.queuedVms = value
- "servers.unscheduled" -> res.unscheduledVms = value
- "servers.active" -> res.runningVms = value
- "servers.finished" -> res.finishedVms = value
- "hosts.total" -> res.hosts = value
- "hosts.available" -> res.availableHosts = value
- }
- }
-
- return res
-}
-
-/**
* Process the trace.
*/
suspend fun processTrace(
@@ -255,7 +163,7 @@ suspend fun processTrace(
reader: TraceReader<SimWorkload>,
scheduler: ComputeService,
chan: Channel<Unit>,
- monitor: ExperimentMonitor
+ monitor: ComputeMonitor? = null,
) {
val client = scheduler.newClient()
val image = client.newImage("vm-image")
@@ -289,7 +197,7 @@ suspend fun processTrace(
suspendCancellableCoroutine { cont ->
server.watch(object : ServerWatcher {
override fun onStateChanged(server: Server, newState: ServerState) {
- monitor.reportVmStateChange(clock.millis(), server, newState)
+ monitor?.onStateChange(clock.millis(), server, newState)
if (newState == ServerState.TERMINATED) {
cont.resume(Unit)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index ee832af8..4db04591 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -29,11 +29,11 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
+import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
@@ -41,6 +41,8 @@ import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.withMonitor
import java.io.File
import java.io.FileInputStream
import java.util.*
@@ -120,14 +122,15 @@ abstract class Portfolio(name: String) : Experiment(name) {
}
val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
- PerformanceInterferenceReader(FileInputStream(config.getString("interference-model")))
- .use { VmInterferenceModel(it.read(), Random(seeder.nextLong())) }
+ PerformanceInterferenceReader()
+ .read(FileInputStream(config.getString("interference-model")))
+ .let { VmInterferenceModel(it, Random(seeder.nextLong())) }
else
null
val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
- val monitor = ParquetExperimentMonitor(
+ val monitor = ParquetExportMonitor(
File(config.getString("output-path")),
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
@@ -148,7 +151,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
null
}
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
trace,
@@ -159,9 +162,16 @@ abstract class Portfolio(name: String) : Experiment(name) {
}
failureDomain?.cancel()
+ monitor.close()
}
- val monitorResults = collectMetrics(meterProvider as MetricProducer)
- logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
+ val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ logger.debug {
+ "Finish " +
+ "SUBMIT=${monitorResults.instanceCount} " +
+ "FAIL=${monitorResults.failedInstanceCount} " +
+ "QUEUE=${monitorResults.queuedInstanceCount} " +
+ "RUNNING=${monitorResults.activeHostCount}"
+ }
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
index 897a6692..c5cb80e2 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
@@ -20,14 +20,14 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry.parquet
+package org.opendc.experiments.capelin.export.parquet
import mu.KotlinLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.experiments.capelin.telemetry.Event
import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.Closeable
import java.io.File
@@ -36,20 +36,20 @@ import java.util.concurrent.BlockingQueue
import kotlin.concurrent.thread
/**
- * The logging instance to use.
+ * A writer that writes data in Parquet format.
*/
-private val logger = KotlinLogging.logger {}
-
-/**
- * A writer that writes events in Parquet format.
- */
-public open class ParquetEventWriter<in T : Event>(
+public open class ParquetDataWriter<in T>(
private val path: File,
private val schema: Schema,
private val converter: (T, GenericData.Record) -> Unit,
private val bufferSize: Int = 4096
) : Runnable, Closeable {
/**
+ * The logging instance to use.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
* The writer to write the Parquet file.
*/
private val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
@@ -57,6 +57,7 @@ public open class ParquetEventWriter<in T : Event>(
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(4 * 1024 * 1024) // For compression
.withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
/**
@@ -67,7 +68,7 @@ public open class ParquetEventWriter<in T : Event>(
/**
* The thread that is responsible for writing the Parquet records.
*/
- private val writerThread = thread(start = false, name = "parquet-writer") { run() }
+ private val writerThread = thread(start = false, name = this.toString()) { run() }
/**
* Write the specified metrics to the database.
@@ -100,7 +101,7 @@ public open class ParquetEventWriter<in T : Event>(
is Action.Write<*> -> {
val record = GenericData.Record(schema)
@Suppress("UNCHECKED_CAST")
- converter(action.event as T, record)
+ converter(action.data as T, record)
writer.write(record)
}
}
@@ -121,6 +122,6 @@ public open class ParquetEventWriter<in T : Event>(
/**
* Write the specified metrics to the database.
*/
- public data class Write<out T : Event>(val event: T) : Action()
+ public data class Write<out T>(val data: T) : Action()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
index 7631f55f..79b84e9d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,22 +20,36 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry
+package org.opendc.experiments.capelin.export.parquet
-import org.opendc.compute.api.Server
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServiceData
+import java.io.File
/**
- * A periodic report of a virtual machine's metrics.
+ * A [ComputeMonitor] that logs the events to a Parquet file.
*/
-public data class VmEvent(
- override val timestamp: Long,
- public val duration: Long,
- public val vm: Server,
- public val host: Server,
- public val requestedBurst: Long,
- public val grantedBurst: Long,
- public val overcommissionedBurst: Long,
- public val interferedBurst: Long,
- public val cpuUsage: Double,
- public val cpuDemand: Double
-) : Event("vm-metrics")
+public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
+ private val hostWriter = ParquetHostDataWriter(
+ File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+ private val serviceWriter = ParquetServiceDataWriter(
+ File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ override fun record(data: HostData) {
+ hostWriter.write(data)
+ }
+
+ override fun record(data: ServiceData) {
+ serviceWriter.write(data)
+ }
+
+ override fun close() {
+ hostWriter.close()
+ serviceWriter.close()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
new file mode 100644
index 00000000..8912c12e
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.export.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.opendc.telemetry.compute.table.HostData
+import java.io.File
+
+/**
+ * A Parquet event writer for [HostData]s.
+ */
+public class ParquetHostDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<HostData>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "host-writer"
+
+ public companion object {
+ private val convert: (HostData, GenericData.Record) -> Unit = { data, record ->
+ record.put("host_id", data.host.name)
+ record.put("state", data.host.state.name)
+ record.put("timestamp", data.timestamp)
+ record.put("total_work", data.totalWork)
+ record.put("granted_work", data.grantedWork)
+ record.put("overcommitted_work", data.overcommittedWork)
+ record.put("interfered_work", data.interferedWork)
+ record.put("cpu_usage", data.cpuUsage)
+ record.put("cpu_demand", data.cpuDemand)
+ record.put("power_draw", data.powerDraw)
+ record.put("instance_count", data.instanceCount)
+ record.put("cores", data.host.model.cpuCount)
+ }
+
+ private val schema: Schema = SchemaBuilder
+ .record("host")
+ .namespace("org.opendc.telemetry.compute")
+ .fields()
+ .name("timestamp").type().longType().noDefault()
+ .name("host_id").type().stringType().noDefault()
+ .name("state").type().stringType().noDefault()
+ .name("requested_work").type().longType().noDefault()
+ .name("granted_work").type().longType().noDefault()
+ .name("overcommitted_work").type().longType().noDefault()
+ .name("interfered_work").type().longType().noDefault()
+ .name("cpu_usage").type().doubleType().noDefault()
+ .name("cpu_demand").type().doubleType().noDefault()
+ .name("power_draw").type().doubleType().noDefault()
+ .name("instance_count").type().intType().noDefault()
+ .name("cores").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
index 8feff8d9..36d630f3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
@@ -20,46 +20,46 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry.parquet
+package org.opendc.experiments.capelin.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
+import org.opendc.telemetry.compute.table.ServiceData
import java.io.File
/**
- * A Parquet event writer for [ProvisionerEvent]s.
+ * A Parquet event writer for [ServiceData]s.
*/
-public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) {
+public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<ServiceData>(path, schema, convert, bufferSize) {
- override fun toString(): String = "provisioner-writer"
+ override fun toString(): String = "service-writer"
public companion object {
- private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record ->
- record.put("timestamp", event.timestamp)
- record.put("host_total_count", event.totalHostCount)
- record.put("host_available_count", event.availableHostCount)
- record.put("vm_total_count", event.totalVmCount)
- record.put("vm_active_count", event.activeVmCount)
- record.put("vm_inactive_count", event.inactiveVmCount)
- record.put("vm_waiting_count", event.waitingVmCount)
- record.put("vm_failed_count", event.failedVmCount)
+ private val convert: (ServiceData, GenericData.Record) -> Unit = { data, record ->
+ record.put("timestamp", data.timestamp)
+ record.put("host_total_count", data.hostCount)
+ record.put("host_available_count", data.activeHostCount)
+ record.put("instance_total_count", data.instanceCount)
+ record.put("instance_active_count", data.runningInstanceCount)
+ record.put("instance_inactive_count", data.finishedInstanceCount)
+ record.put("instance_waiting_count", data.queuedInstanceCount)
+ record.put("instance_failed_count", data.failedInstanceCount)
}
private val schema: Schema = SchemaBuilder
- .record("provisioner_metrics")
- .namespace("org.opendc.experiments.sc20")
+ .record("service")
+ .namespace("org.opendc.telemetry.compute")
.fields()
.name("timestamp").type().longType().noDefault()
.name("host_total_count").type().intType().noDefault()
.name("host_available_count").type().intType().noDefault()
- .name("vm_total_count").type().intType().noDefault()
- .name("vm_active_count").type().intType().noDefault()
- .name("vm_inactive_count").type().intType().noDefault()
- .name("vm_waiting_count").type().intType().noDefault()
- .name("vm_failed_count").type().intType().noDefault()
+ .name("instance_total_count").type().intType().noDefault()
+ .name("instance_active_count").type().intType().noDefault()
+ .name("instance_inactive_count").type().intType().noDefault()
+ .name("instance_waiting_count").type().intType().noDefault()
+ .name("instance_failed_count").type().intType().noDefault()
.endRecord()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
deleted file mode 100644
index 83351c41..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.monitor
-
-import mu.KotlinLogging
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
-import org.opendc.experiments.capelin.telemetry.HostEvent
-import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
-import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter
-import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter
-import java.io.File
-
-/**
- * The logger instance to use.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * An [ExperimentMonitor] that logs the events to a Parquet file.
- */
-public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor {
- private val hostWriter = ParquetHostEventWriter(
- File(base, "host-metrics/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
- private val provisionerWriter = ParquetProvisionerEventWriter(
- File(base, "provisioner-metrics/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
-
- override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
-
- override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
- logger.debug { "Host ${host.uid} changed state $newState [$time]" }
- }
-
- override fun reportHostData(
- time: Long,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- instanceCount: Int,
- host: Host
- ) {
- hostWriter.write(
- HostEvent(
- time,
- 5 * 60 * 1000L,
- host,
- instanceCount,
- totalWork.toLong(),
- grantedWork.toLong(),
- overcommittedWork.toLong(),
- interferedWork.toLong(),
- cpuUsage,
- cpuDemand,
- powerDraw,
- host.model.cpuCount
- )
- )
- }
-
- override fun reportServiceData(
- time: Long,
- totalHostCount: Int,
- availableHostCount: Int,
- totalVmCount: Int,
- activeVmCount: Int,
- inactiveVmCount: Int,
- waitingVmCount: Int,
- failedVmCount: Int
- ) {
- provisionerWriter.write(
- ProvisionerEvent(
- time,
- totalHostCount,
- availableHostCount,
- totalVmCount,
- activeVmCount,
- inactiveVmCount,
- waitingVmCount,
- failedVmCount
- )
- )
- }
-
- override fun close() {
- hostWriter.close()
- provisionerWriter.close()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
deleted file mode 100644
index c8fe1cb2..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.HostEvent
-import java.io.File
-
-/**
- * A Parquet event writer for [HostEvent]s.
- */
-public class ParquetHostEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) {
-
- override fun toString(): String = "host-writer"
-
- public companion object {
- private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record ->
- // record.put("portfolio_id", event.run.parent.parent.id)
- // record.put("scenario_id", event.run.parent.id)
- // record.put("run_id", event.run.id)
- record.put("host_id", event.host.name)
- record.put("state", event.host.state.name)
- record.put("timestamp", event.timestamp)
- record.put("duration", event.duration)
- record.put("vm_count", event.vmCount)
- record.put("requested_burst", event.requestedBurst)
- record.put("granted_burst", event.grantedBurst)
- record.put("overcommissioned_burst", event.overcommissionedBurst)
- record.put("interfered_burst", event.interferedBurst)
- record.put("cpu_usage", event.cpuUsage)
- record.put("cpu_demand", event.cpuDemand)
- record.put("power_draw", event.powerDraw)
- record.put("cores", event.cores)
- }
-
- private val schema: Schema = SchemaBuilder
- .record("host_metrics")
- .namespace("org.opendc.experiments.sc20")
- .fields()
- // .name("portfolio_id").type().intType().noDefault()
- // .name("scenario_id").type().intType().noDefault()
- // .name("run_id").type().intType().noDefault()
- .name("timestamp").type().longType().noDefault()
- .name("duration").type().longType().noDefault()
- .name("host_id").type().stringType().noDefault()
- .name("state").type().stringType().noDefault()
- .name("vm_count").type().intType().noDefault()
- .name("requested_burst").type().longType().noDefault()
- .name("granted_burst").type().longType().noDefault()
- .name("overcommissioned_burst").type().longType().noDefault()
- .name("interfered_burst").type().longType().noDefault()
- .name("cpu_usage").type().doubleType().noDefault()
- .name("cpu_demand").type().doubleType().noDefault()
- .name("power_draw").type().doubleType().noDefault()
- .name("cores").type().intType().noDefault()
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt
deleted file mode 100644
index 946410eb..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.RunEvent
-import java.io.File
-
-/**
- * A Parquet event writer for [RunEvent]s.
- */
-public class ParquetRunEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) {
-
- override fun toString(): String = "run-writer"
-
- public companion object {
- private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record ->
- val portfolio = event.portfolio
- record.put("portfolio_name", portfolio.name)
- record.put("scenario_id", portfolio.id)
- record.put("run_id", event.repeat)
- record.put("topology", portfolio.topology.name)
- record.put("workload_name", portfolio.workload.name)
- record.put("workload_fraction", portfolio.workload.fraction)
- record.put("workload_sampler", portfolio.workload.samplingStrategy)
- record.put("allocation_policy", portfolio.allocationPolicy)
- record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency)
- record.put("interference", portfolio.operationalPhenomena.hasInterference)
- record.put("seed", event.repeat)
- }
-
- private val schema: Schema = SchemaBuilder
- .record("runs")
- .namespace("org.opendc.experiments.sc20")
- .fields()
- .name("portfolio_name").type().stringType().noDefault()
- .name("scenario_id").type().intType().noDefault()
- .name("run_id").type().intType().noDefault()
- .name("topology").type().stringType().noDefault()
- .name("workload_name").type().stringType().noDefault()
- .name("workload_fraction").type().doubleType().noDefault()
- .name("workload_sampler").type().stringType().noDefault()
- .name("allocation_policy").type().stringType().noDefault()
- .name("failure_frequency").type().doubleType().noDefault()
- .name("interference").type().booleanType().noDefault()
- .name("seed").type().intType().noDefault()
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
index a19f5699..9549af42 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
@@ -31,14 +31,13 @@ import java.io.InputStream
/**
* A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
- *
- * @param input The input stream to read from.
- * @param mapper The Jackson object mapper to use.
*/
-class PerformanceInterferenceReader(
- private val input: InputStream,
- private val mapper: ObjectMapper = jacksonObjectMapper()
-) : AutoCloseable {
+class PerformanceInterferenceReader {
+ /**
+ * The [ObjectMapper] to use.
+ */
+ private val mapper = jacksonObjectMapper()
+
init {
mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java)
}
@@ -46,12 +45,8 @@ class PerformanceInterferenceReader(
/**
* Read the performance interface model from the input.
*/
- fun read(): List<VmInterferenceGroup> {
- return mapper.readValue(input)
- }
-
- override fun close() {
- input.close()
+ fun read(input: InputStream): List<VmInterferenceGroup> {
+ return input.use { mapper.readValue(input) }
}
private data class GroupMixin(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
index 61e4cab5..ed82217d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
@@ -41,8 +41,6 @@ import java.util.UUID
import java.util.concurrent.ArrayBlockingQueue
import kotlin.concurrent.thread
-private val logger = KotlinLogging.logger {}
-
/**
* A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
*
@@ -50,6 +48,8 @@ private val logger = KotlinLogging.logger {}
* @param selectedVms The list of VMs to read from the trace.
*/
class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> {
+ private val logger = KotlinLogging.logger {}
+
/**
* The internal iterator to use for this reader.
*/
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt
index 7a1683f0..b55bd577 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt
@@ -29,24 +29,19 @@ import java.io.InputStream
/**
* A parser for the JSON VM placement data files used for the TPDS article on Capelin.
- *
- * @param input The input stream to read from.
- * @param mapper The Jackson object mapper to use.
*/
-public class VmPlacementReader(
- private val input: InputStream,
- private val mapper: ObjectMapper = jacksonObjectMapper()
-) : AutoCloseable {
+class VmPlacementReader {
+ /**
+ * The [ObjectMapper] to parse the placement.
+ */
+ private val mapper = jacksonObjectMapper()
+
/**
* Read the VM placements from the input.
*/
- public fun read(): Map<String, String> {
+ fun read(input: InputStream): Map<String, String> {
return mapper.readValue<Map<String, String>>(input)
.mapKeys { "vm__workload__${it.key}.txt" }
.mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00
}
-
- override fun close() {
- input.close()
- }
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
index 71c9d52e..24abb109 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
@@ -118,9 +118,9 @@ internal class SvResourceStateTable(path: Path) : Table {
private fun nextDelegate(): TableReader? {
return if (it.hasNext()) {
- val (partition, path) = it.next()
+ val (_, path) = it.next()
val reader = path.bufferedReader()
- return SvResourceStateTableReader(partition, reader)
+ return SvResourceStateTableReader(reader)
} else {
null
}
@@ -133,7 +133,7 @@ internal class SvResourceStateTable(path: Path) : Table {
override fun newReader(partition: String): TableReader {
val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
val reader = path.bufferedReader()
- return SvResourceStateTableReader(partition, reader)
+ return SvResourceStateTableReader(reader)
}
override fun toString(): String = "SvResourceStateTable"
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
index adcdb2ea..1a556f8d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
@@ -29,7 +29,7 @@ import java.time.Instant
/**
* A [TableReader] for the Bitbrains resource state table.
*/
-internal class SvResourceStateTableReader(partition: String, private val reader: BufferedReader) : TableReader {
+internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader {
/**
* The current parser state.
*/
@@ -57,7 +57,7 @@ internal class SvResourceStateTableReader(partition: String, private val reader:
val length = line.length
var col = 0
- var start = 0
+ var start: Int
var end = 0
while (end < length) {
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 2934bbe6..aed9a4bb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -29,7 +29,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
@@ -38,7 +37,6 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
@@ -46,6 +44,10 @@ import org.opendc.experiments.capelin.trace.TraceReader
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.withMonitor
import java.io.File
import java.util.*
@@ -80,7 +82,6 @@ class CapelinIntegrationTest {
)
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- lateinit var monitorResults: ComputeMetrics
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
@@ -98,7 +99,7 @@ class CapelinIntegrationTest {
null
}
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -111,15 +112,21 @@ class CapelinIntegrationTest {
failureDomain?.cancel()
}
- monitorResults = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") },
- { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
- { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
- { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
+ { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") },
+ { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") },
+ { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") },
+ { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") },
{ assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } },
{ assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } },
{ assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
@@ -145,7 +152,7 @@ class CapelinIntegrationTest {
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -156,8 +163,14 @@ class CapelinIntegrationTest {
}
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
@@ -184,12 +197,14 @@ class CapelinIntegrationTest {
val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
val performanceInterferenceModel =
- PerformanceInterferenceReader(perfInterferenceInput).use { VmInterferenceModel(it.read(), Random(seed.toLong())) }
+ PerformanceInterferenceReader()
+ .read(perfInterferenceInput)
+ .let { VmInterferenceModel(it, Random(seed.toLong())) }
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -200,8 +215,14 @@ class CapelinIntegrationTest {
}
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
@@ -239,7 +260,7 @@ class CapelinIntegrationTest {
chan
)
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -252,8 +273,14 @@ class CapelinIntegrationTest {
failureDomain.cancel()
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
@@ -283,32 +310,19 @@ class CapelinIntegrationTest {
return ClusterEnvironmentReader(stream)
}
- class TestExperimentReporter : ExperimentMonitor {
+ class TestExperimentReporter : ComputeMonitor {
var totalWork = 0L
var totalGrantedWork = 0L
var totalOvercommittedWork = 0L
var totalInterferedWork = 0L
var totalPowerDraw = 0.0
- override fun reportHostData(
- time: Long,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- instanceCount: Int,
- host: Host,
- ) {
- this.totalWork += totalWork.toLong()
- totalGrantedWork += grantedWork.toLong()
- totalOvercommittedWork += overcommittedWork.toLong()
- totalInterferedWork += interferedWork.toLong()
- totalPowerDraw += powerDraw
+ override fun record(data: HostData) {
+ this.totalWork += data.totalWork.toLong()
+ totalGrantedWork += data.grantedWork.toLong()
+ totalOvercommittedWork += data.overcommittedWork.toLong()
+ totalInterferedWork += data.interferedWork.toLong()
+ totalPowerDraw += data.powerDraw
}
-
- override fun close() {}
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt
index 9b1513dc..fbc39b87 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt
@@ -33,9 +33,7 @@ class PerformanceInterferenceReaderTest {
@Test
fun testSmoke() {
val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json"))
- val reader = PerformanceInterferenceReader(input)
-
- val result = reader.use { reader.read() }
+ val result = PerformanceInterferenceReader().read(input)
assertAll(
{ assertEquals(2, result.size) },
diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
index 7d34d098..40ac2967 100644
--- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
@@ -37,6 +37,7 @@ dependencies {
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcExperiments.opendcExperimentsCapelin)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(projects.opendcTelemetry.opendcTelemetryCompute)
implementation(libs.kotlin.logging)
implementation(libs.config)
diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
index e64e20a2..02aaab3c 100644
--- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
+++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
@@ -37,7 +37,7 @@ import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.*
-import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
+import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
@@ -50,6 +50,8 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.*
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.withMonitor
import java.io.File
import java.time.Clock
import java.util.*
@@ -87,11 +89,11 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
)
val meterProvider: MeterProvider = createMeterProvider(clock)
- val monitor = ParquetExperimentMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096)
+ val monitor = ParquetExportMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096)
val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace))
withComputeService(clock, meterProvider, allocationPolicy) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
trace,
@@ -102,12 +104,12 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
}
}
- val monitorResults = collectMetrics(meterProvider as MetricProducer)
+ val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
logger.debug {
- "Finish SUBMIT=${monitorResults.submittedVms} " +
- "FAIL=${monitorResults.unscheduledVms} " +
- "QUEUE=${monitorResults.queuedVms} " +
- "RUNNING=${monitorResults.runningVms}"
+ "Finish SUBMIT=${monitorResults.instanceCount} " +
+ "FAIL=${monitorResults.failedInstanceCount} " +
+ "QUEUE=${monitorResults.queuedInstanceCount} " +
+ "RUNNING=${monitorResults.runningInstanceCount}"
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
index c29e116e..6a3de9bc 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,14 +20,18 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry
+description = "Telemetry for OpenDC Compute"
-/**
- * An event that occurs within the system.
- */
-public abstract class Event(public val name: String) {
- /**
- * The time of occurrence of this event.
- */
- public abstract val timestamp: Long
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcTelemetry.opendcTelemetrySdk)
+
+ implementation(projects.opendcCompute.opendcComputeSimulator)
+ implementation(libs.opentelemetry.semconv)
+ implementation(libs.kotlin.logging)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
index 42b7cbb8..95e7ff9e 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
@@ -20,41 +20,40 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.monitor
+package org.opendc.telemetry.compute
import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import org.opendc.compute.service.driver.Host
-import org.opendc.experiments.capelin.extractComputeMetrics
+import org.opendc.telemetry.compute.table.HostData
import java.time.Clock
/**
- * A [MetricExporter] that exports the metrics to the [ExperimentMonitor].
+ * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation.
*/
-public class ExperimentMetricExporter(
- private val monitor: ExperimentMonitor,
+public class ComputeMetricExporter(
private val clock: Clock,
- private val hosts: Map<String, Host>
+ private val hosts: Map<String, Host>,
+ private val monitor: ComputeMonitor
) : MetricExporter {
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
return try {
reportHostMetrics(metrics)
- reportProvisionerMetrics(metrics)
+ reportServiceMetrics(metrics)
CompletableResultCode.ofSuccess()
} catch (e: Throwable) {
CompletableResultCode.ofFailure()
}
}
- private var lastHostMetrics: Map<String, HostMetrics> = emptyMap()
- private val hostMetricsSingleton = HostMetrics()
+ private var lastHostMetrics: Map<String, HBuffer> = emptyMap()
+ private val hostMetricsSingleton = HBuffer()
private fun reportHostMetrics(metrics: Collection<MetricData>) {
- val hostMetrics = mutableMapOf<String, HostMetrics>()
- hosts.mapValuesTo(hostMetrics) { HostMetrics() }
+ val hostMetrics = mutableMapOf<String, HBuffer>()
for (metric in metrics) {
when (metric.name) {
@@ -66,83 +65,68 @@ public class ExperimentMetricExporter(
"cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v }
"cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v }
"guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() }
+ "host.time.up" -> mapLongSum(metric, hostMetrics) { m, v -> m.uptime = v }
+ "host.time.down" -> mapLongSum(metric, hostMetrics) { m, v -> m.downtime = v }
}
}
for ((id, hostMetric) in hostMetrics) {
val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton)
- val host = hosts.getValue(id)
- monitor.reportHostData(
- clock.millis(),
- hostMetric.totalWork - lastHostMetric.totalWork,
- hostMetric.grantedWork - lastHostMetric.grantedWork,
- hostMetric.overcommittedWork - lastHostMetric.overcommittedWork,
- hostMetric.interferedWork - lastHostMetric.interferedWork,
- hostMetric.cpuUsage,
- hostMetric.cpuDemand,
- hostMetric.powerDraw,
- hostMetric.instanceCount,
- host
+ val host = hosts[id] ?: continue
+
+ monitor.record(
+ HostData(
+ clock.millis(),
+ host,
+ hostMetric.totalWork - lastHostMetric.totalWork,
+ hostMetric.grantedWork - lastHostMetric.grantedWork,
+ hostMetric.overcommittedWork - lastHostMetric.overcommittedWork,
+ hostMetric.interferedWork - lastHostMetric.interferedWork,
+ hostMetric.cpuUsage,
+ hostMetric.cpuDemand,
+ hostMetric.instanceCount,
+ hostMetric.powerDraw,
+ hostMetric.uptime - lastHostMetric.uptime,
+ hostMetric.downtime - lastHostMetric.downtime,
+ )
)
}
lastHostMetrics = hostMetrics
}
- private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) {
val points = data.doubleSummaryData?.points ?: emptyList()
for (point in points) {
- val uid = point.attributes[ResourceAttributes.HOST_ID]
- val hostMetric = hostMetrics[uid]
-
- if (hostMetric != null) {
- // Take the average of the summary
- val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2
- block(hostMetric, avg)
- }
+ val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
+ val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() }
+ val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2
+ block(hostMetric, avg)
}
}
- private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) {
+ private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Long) -> Unit) {
val points = data?.longSumData?.points ?: emptyList()
for (point in points) {
- val uid = point.attributes[ResourceAttributes.HOST_ID]
- val hostMetric = hostMetrics[uid]
-
- if (hostMetric != null) {
- block(hostMetric, point.value)
- }
+ val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
+ val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() }
+ block(hostMetric, point.value)
}
}
- private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) {
val points = data?.doubleSumData?.points ?: emptyList()
for (point in points) {
- val uid = point.attributes[ResourceAttributes.HOST_ID]
- val hostMetric = hostMetrics[uid]
-
- if (hostMetric != null) {
- block(hostMetric, point.value)
- }
+ val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
+ val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() }
+ block(hostMetric, point.value)
}
}
- private fun reportProvisionerMetrics(metrics: Collection<MetricData>) {
- val res = extractComputeMetrics(metrics)
-
- monitor.reportServiceData(
- clock.millis(),
- res.hosts,
- res.availableHosts,
- res.submittedVms,
- res.runningVms,
- res.finishedVms,
- res.queuedVms,
- res.unscheduledVms
- )
- }
-
- private class HostMetrics {
+ /**
+ * A buffer for host metrics before they are reported.
+ */
+ private class HBuffer {
var totalWork: Double = 0.0
var grantedWork: Double = 0.0
var overcommittedWork: Double = 0.0
@@ -151,6 +135,12 @@ public class ExperimentMetricExporter(
var cpuDemand: Double = 0.0
var instanceCount: Int = 0
var powerDraw: Double = 0.0
+ var uptime: Long = 0
+ var downtime: Long = 0
+ }
+
+ private fun reportServiceMetrics(metrics: Collection<MetricData>) {
+ monitor.record(extractServiceMetrics(clock.millis(), metrics))
}
override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
index 9a4aec35..ec303b37 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
@@ -20,54 +20,42 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.monitor
+package org.opendc.telemetry.compute
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.telemetry.compute.table.ServiceData
/**
- * A monitor watches the events of an experiment.
+ * A monitor that tracks the metrics and events of the OpenDC Compute service.
*/
-public interface ExperimentMonitor : AutoCloseable {
+public interface ComputeMonitor {
/**
- * This method is invoked when the state of a VM changes.
+ * This method is invoked when the state of a [Server] changes.
*/
- public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
+ public fun onStateChange(timestamp: Long, server: Server, newState: ServerState) {}
/**
- * This method is invoked when the state of a host changes.
+ * This method is invoked when the state of a [Host] changes.
*/
- public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {}
+ public fun onStateChange(time: Long, host: Host, newState: HostState) {}
/**
- * This method is invoked for a host for each slice that is finishes.
+ * Record the specified [data].
*/
- public fun reportHostData(
- time: Long,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- instanceCount: Int,
- host: Host
- ) {}
+ public fun record(data: ServerData) {}
/**
- * This method is invoked for reporting service data.
+ * Record the specified [data].
*/
- public fun reportServiceData(
- time: Long,
- totalHostCount: Int,
- availableHostCount: Int,
- totalVmCount: Int,
- activeVmCount: Int,
- inactiveVmCount: Int,
- waitingVmCount: Int,
- failedVmCount: Int
- ) {}
+ public fun record(data: HostData) {}
+
+ /**
+ * Record the specified [data].
+ */
+ public fun record(data: ServiceData) {}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
new file mode 100644
index 00000000..d3d983b9
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute
+
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.coroutineScope
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostListener
+import org.opendc.compute.service.driver.HostState
+import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import java.time.Clock
+
+/**
+ * Attach the specified monitor to the OpenDC Compute service.
+ */
+public suspend fun withMonitor(
+ scheduler: ComputeService,
+ clock: Clock,
+ metricProducer: MetricProducer,
+ monitor: ComputeMonitor,
+ exportInterval: Long = 5L * 60 * 1000, /* Every 5 min (which is the granularity of the workload trace) */
+ block: suspend CoroutineScope.() -> Unit
+): Unit = coroutineScope {
+ // Monitor host events
+ for (host in scheduler.hosts) {
+ monitor.onStateChange(clock.millis(), host, HostState.UP)
+ host.addListener(object : HostListener {
+ override fun onStateChanged(host: Host, newState: HostState) {
+ monitor.onStateChange(clock.millis(), host, newState)
+ }
+ })
+ }
+
+ val reader = CoroutineMetricReader(
+ this,
+ listOf(metricProducer),
+ ComputeMetricExporter(clock, scheduler.hosts.associateBy { it.uid.toString() }, monitor),
+ exportInterval
+ )
+
+ try {
+ block(this)
+ } finally {
+ reader.close()
+ }
+}
+
+/**
+ * Collect the metrics of the compute service.
+ */
+public fun collectServiceMetrics(timestamp: Long, metricProducer: MetricProducer): ServiceData {
+ return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics())
+}
+
+/**
+ * Extract a [ServiceData] object from the specified list of metric data.
+ */
+public fun extractServiceMetrics(timestamp: Long, metrics: Collection<MetricData>): ServiceData {
+ var submittedVms = 0
+ var queuedVms = 0
+ var unscheduledVms = 0
+ var runningVms = 0
+ var finishedVms = 0
+ var hosts = 0
+ var availableHosts = 0
+
+ for (metric in metrics) {
+ val points = metric.longSumData.points
+
+ if (points.isEmpty()) {
+ continue
+ }
+
+ val value = points.first().value.toInt()
+ when (metric.name) {
+ "servers.submitted" -> submittedVms = value
+ "servers.waiting" -> queuedVms = value
+ "servers.unscheduled" -> unscheduledVms = value
+ "servers.active" -> runningVms = value
+ "servers.finished" -> finishedVms = value
+ "hosts.total" -> hosts = value
+ "hosts.available" -> availableHosts = value
+ }
+ }
+
+ return ServiceData(timestamp, hosts, availableHosts, submittedVms, runningVms, finishedVms, queuedVms, unscheduledVms)
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
index 899fc9b1..8e6c34d0 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,24 +20,24 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry
+package org.opendc.telemetry.compute.table
import org.opendc.compute.service.driver.Host
/**
- * A periodic report of the host machine metrics.
+ * A trace entry for a particular host.
*/
-public data class HostEvent(
- override val timestamp: Long,
- public val duration: Long,
+public data class HostData(
+ public val timestamp: Long,
public val host: Host,
- public val vmCount: Int,
- public val requestedBurst: Long,
- public val grantedBurst: Long,
- public val overcommissionedBurst: Long,
- public val interferedBurst: Long,
+ public val totalWork: Double,
+ public val grantedWork: Double,
+ public val overcommittedWork: Double,
+ public val interferedWork: Double,
public val cpuUsage: Double,
public val cpuDemand: Double,
+ public val instanceCount: Int,
public val powerDraw: Double,
- public val cores: Int
-) : Event("host-metrics")
+ public val uptime: Long,
+ public val downtime: Long,
+)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
index 6c8fc941..2a9fa8a6 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,15 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry
+package org.opendc.telemetry.compute.table
-import org.opendc.experiments.capelin.Portfolio
+import org.opendc.compute.api.Server
/**
- * A periodic report of the host machine metrics.
+ * A trace entry for a particular server.
*/
-public data class RunEvent(
- val portfolio: Portfolio,
- val repeat: Int,
- override val timestamp: Long
-) : Event("run")
+public data class ServerData(
+ public val timestamp: Long,
+ public val server: Server,
+ public val uptime: Long,
+ public val downtime: Long,
+)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
index 539c9bc9..f6ff5db5 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,18 +20,18 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry
+package org.opendc.telemetry.compute.table
/**
- * A periodic report of the provisioner's metrics.
+ * A trace entry for the compute service.
*/
-public data class ProvisionerEvent(
- override val timestamp: Long,
- public val totalHostCount: Int,
- public val availableHostCount: Int,
- public val totalVmCount: Int,
- public val activeVmCount: Int,
- public val inactiveVmCount: Int,
- public val waitingVmCount: Int,
- public val failedVmCount: Int
-) : Event("provisioner-metrics")
+public data class ServiceData(
+ public val timestamp: Long,
+ public val hostCount: Int,
+ public val activeHostCount: Int,
+ public val instanceCount: Int,
+ public val runningInstanceCount: Int,
+ public val finishedInstanceCount: Int,
+ public val queuedInstanceCount: Int,
+ public val failedInstanceCount: Int
+)
diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts
index ec4a4673..bfbb1687 100644
--- a/opendc-web/opendc-web-runner/build.gradle.kts
+++ b/opendc-web/opendc-web-runner/build.gradle.kts
@@ -39,6 +39,7 @@ dependencies {
implementation(projects.opendcExperiments.opendcExperimentsCapelin)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(projects.opendcTelemetry.opendcTelemetryCompute)
implementation(libs.kotlin.logging)
implementation(libs.clikt)
@@ -47,6 +48,7 @@ dependencies {
implementation(libs.ktor.client.auth)
implementation(libs.ktor.client.jackson)
implementation(libs.jackson.datatype.jsr310)
+ implementation(kotlin("reflect"))
runtimeOnly(libs.log4j.slf4j)
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 53d50357..5d481270 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -32,7 +32,6 @@ import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
-import org.opendc.compute.service.scheduler.weights.*
import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.env.MachineDef
@@ -47,6 +46,8 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.LinearPowerModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.withMonitor
import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.web.client.ApiClient
import org.opendc.web.client.AuthConfiguration
@@ -131,7 +132,7 @@ class RunnerCli : CliktCommand(name = "runner") {
/**
* Run a single scenario.
*/
- private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List<WebExperimentMonitor.Result> {
+ private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List<WebComputeMonitor.Result> {
val id = scenario.id
logger.info { "Constructing performance interference model" }
@@ -150,7 +151,7 @@ class RunnerCli : CliktCommand(name = "runner") {
return@let null
}
- PerformanceInterferenceReader(path.inputStream()).use { reader -> reader.read() }
+ PerformanceInterferenceReader().read(path.inputStream())
}
val targets = portfolio.targets
@@ -176,8 +177,8 @@ class RunnerCli : CliktCommand(name = "runner") {
environment: EnvironmentReader,
traceReader: RawParquetTraceReader,
interferenceModel: VmInterferenceModel?
- ): WebExperimentMonitor.Result {
- val monitor = WebExperimentMonitor()
+ ): WebComputeMonitor.Result {
+ val monitor = WebComputeMonitor()
try {
runBlockingSimulation {
@@ -220,7 +221,7 @@ class RunnerCli : CliktCommand(name = "runner") {
null
}
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
trace,
@@ -233,8 +234,14 @@ class RunnerCli : CliktCommand(name = "runner") {
failureDomain?.cancel()
}
- val monitorResults = collectMetrics(metricProducer)
- logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
+ val monitorResults = collectServiceMetrics(clock.millis(), metricProducer)
+ logger.debug {
+ "Finish " +
+ "SUBMIT=${monitorResults.instanceCount} " +
+ "FAIL=${monitorResults.failedInstanceCount} " +
+ "QUEUE=${monitorResults.queuedInstanceCount} " +
+ "RUNNING=${monitorResults.runningInstanceCount}"
+ }
}
} catch (cause: Throwable) {
logger.warn(cause) { "Experiment failed" }
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
index 4044cec9..e0e3488f 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
@@ -61,14 +61,14 @@ public class ScenarioManager(private val client: ApiClient) {
/**
* Persist the specified results.
*/
- public suspend fun finish(id: String, results: List<WebExperimentMonitor.Result>) {
+ public suspend fun finish(id: String, results: List<WebComputeMonitor.Result>) {
client.updateJob(
id, SimulationState.FINISHED,
mapOf(
- "total_requested_burst" to results.map { it.totalRequestedBurst },
- "total_granted_burst" to results.map { it.totalGrantedBurst },
- "total_overcommitted_burst" to results.map { it.totalOvercommittedBurst },
- "total_interfered_burst" to results.map { it.totalInterferedBurst },
+ "total_requested_burst" to results.map { it.totalWork },
+ "total_granted_burst" to results.map { it.totalGrantedWork },
+ "total_overcommitted_burst" to results.map { it.totalOvercommittedWork },
+ "total_interfered_burst" to results.map { it.totalInterferedWork },
"mean_cpu_usage" to results.map { it.meanCpuUsage },
"mean_cpu_demand" to results.map { it.meanCpuDemand },
"mean_num_deployed_images" to results.map { it.meanNumDeployedImages },
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
new file mode 100644
index 00000000..c8e58dde
--- /dev/null
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.web.runner
+
+import mu.KotlinLogging
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostState
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServiceData
+import kotlin.math.max
+
+/**
+ * A [ComputeMonitor] that tracks the aggregate metrics for each repeat.
+ */
+public class WebComputeMonitor : ComputeMonitor {
+ private val logger = KotlinLogging.logger {}
+
+ override fun onStateChange(time: Long, host: Host, newState: HostState) {
+ logger.debug { "Host ${host.uid} changed state $newState [$time]" }
+ }
+
+ override fun record(data: HostData) {
+ val duration = 5 * 60 * 1000L
+ val slices = duration / SLICE_LENGTH
+
+ hostAggregateMetrics = AggregateHostMetrics(
+ hostAggregateMetrics.totalWork + data.totalWork,
+ hostAggregateMetrics.totalGrantedWork + data.grantedWork,
+ hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork,
+ hostAggregateMetrics.totalInterferedWork + data.overcommittedWork,
+ hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600,
+ hostAggregateMetrics.totalFailureSlices + if (data.host.state != HostState.UP) slices else 0,
+ hostAggregateMetrics.totalFailureVmSlices + if (data.host.state != HostState.UP) data.instanceCount * slices else 0
+ )
+
+ hostMetrics.compute(data.host) { _, prev ->
+ HostMetrics(
+ (data.cpuUsage.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
+ (data.cpuDemand.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
+ data.instanceCount + (prev?.instanceCount ?: 0),
+ 1 + (prev?.count ?: 0)
+ )
+ }
+ }
+
+ private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
+ private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf()
+ private val SLICE_LENGTH: Long = 5 * 60 * 1000
+
+ data class AggregateHostMetrics(
+ val totalWork: Double = 0.0,
+ val totalGrantedWork: Double = 0.0,
+ val totalOvercommittedWork: Double = 0.0,
+ val totalInterferedWork: Double = 0.0,
+ val totalPowerDraw: Double = 0.0,
+ val totalFailureSlices: Long = 0,
+ val totalFailureVmSlices: Long = 0,
+ )
+
+ data class HostMetrics(
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val instanceCount: Long,
+ val count: Long
+ )
+
+ private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics()
+
+ override fun record(data: ServiceData) {
+ serviceMetrics = AggregateServiceMetrics(
+ max(data.instanceCount, serviceMetrics.vmTotalCount),
+ max(data.queuedInstanceCount, serviceMetrics.vmWaitingCount),
+ max(data.runningInstanceCount, serviceMetrics.vmActiveCount),
+ max(data.finishedInstanceCount, serviceMetrics.vmInactiveCount),
+ max(data.failedInstanceCount, serviceMetrics.vmFailedCount),
+ )
+ }
+
+ public data class AggregateServiceMetrics(
+ val vmTotalCount: Int = 0,
+ val vmWaitingCount: Int = 0,
+ val vmActiveCount: Int = 0,
+ val vmInactiveCount: Int = 0,
+ val vmFailedCount: Int = 0
+ )
+
+ public fun getResult(): Result {
+ return Result(
+ hostAggregateMetrics.totalWork,
+ hostAggregateMetrics.totalGrantedWork,
+ hostAggregateMetrics.totalOvercommittedWork,
+ hostAggregateMetrics.totalInterferedWork,
+ hostMetrics.map { it.value.cpuUsage / it.value.count }.average(),
+ hostMetrics.map { it.value.cpuDemand / it.value.count }.average(),
+ hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(),
+ hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0,
+ hostAggregateMetrics.totalPowerDraw,
+ hostAggregateMetrics.totalFailureSlices,
+ hostAggregateMetrics.totalFailureVmSlices,
+ serviceMetrics.vmTotalCount,
+ serviceMetrics.vmWaitingCount,
+ serviceMetrics.vmInactiveCount,
+ serviceMetrics.vmFailedCount,
+ )
+ }
+
+ data class Result(
+ val totalWork: Double,
+ val totalGrantedWork: Double,
+ val totalOvercommittedWork: Double,
+ val totalInterferedWork: Double,
+ val meanCpuUsage: Double,
+ val meanCpuDemand: Double,
+ val meanNumDeployedImages: Double,
+ val maxNumDeployedImages: Double,
+ val totalPowerDraw: Double,
+ val totalFailureSlices: Long,
+ val totalFailureVmSlices: Long,
+ val totalVmsSubmitted: Int,
+ val totalVmsQueued: Int,
+ val totalVmsFinished: Int,
+ val totalVmsFailed: Int
+ )
+}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt
deleted file mode 100644
index 82e2a334..00000000
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.web.runner
-
-import mu.KotlinLogging
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.experiments.capelin.telemetry.HostEvent
-import kotlin.math.max
-
-/**
- * An [ExperimentMonitor] that tracks the aggregate metrics for each repeat.
- */
-public class WebExperimentMonitor : ExperimentMonitor {
- private val logger = KotlinLogging.logger {}
-
- override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
-
- override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
- logger.debug { "Host ${host.uid} changed state $newState [$time]" }
- }
-
- override fun reportHostData(
- time: Long,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- instanceCount: Int,
- host: Host,
- ) {
- processHostEvent(
- HostEvent(
- time,
- 5 * 60 * 1000L,
- host,
- instanceCount,
- totalWork.toLong(),
- grantedWork.toLong(),
- overcommittedWork.toLong(),
- interferedWork.toLong(),
- cpuUsage,
- cpuDemand,
- powerDraw,
- host.model.cpuCount
- )
- )
- }
-
- private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
- private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf()
-
- private fun processHostEvent(event: HostEvent) {
- val slices = event.duration / SLICE_LENGTH
-
- hostAggregateMetrics = AggregateHostMetrics(
- hostAggregateMetrics.totalRequestedBurst + event.requestedBurst,
- hostAggregateMetrics.totalGrantedBurst + event.grantedBurst,
- hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst,
- hostAggregateMetrics.totalInterferedBurst + event.interferedBurst,
- hostAggregateMetrics.totalPowerDraw + (event.duration * event.powerDraw) / 3600,
- hostAggregateMetrics.totalFailureSlices + if (event.host.state != HostState.UP) slices else 0,
- hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != HostState.UP) event.vmCount * slices else 0
- )
-
- hostMetrics.compute(event.host) { _, prev ->
- HostMetrics(
- (event.cpuUsage.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
- (event.cpuDemand.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
- event.vmCount + (prev?.vmCount ?: 0),
- 1 + (prev?.count ?: 0)
- )
- }
- }
-
- private val SLICE_LENGTH: Long = 5 * 60 * 1000
-
- public data class AggregateHostMetrics(
- val totalRequestedBurst: Long = 0,
- val totalGrantedBurst: Long = 0,
- val totalOvercommittedBurst: Long = 0,
- val totalInterferedBurst: Long = 0,
- val totalPowerDraw: Double = 0.0,
- val totalFailureSlices: Long = 0,
- val totalFailureVmSlices: Long = 0,
- )
-
- public data class HostMetrics(
- val cpuUsage: Double,
- val cpuDemand: Double,
- val vmCount: Long,
- val count: Long
- )
-
- private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics()
-
- override fun reportServiceData(
- time: Long,
- totalHostCount: Int,
- availableHostCount: Int,
- totalVmCount: Int,
- activeVmCount: Int,
- inactiveVmCount: Int,
- waitingVmCount: Int,
- failedVmCount: Int
- ) {
- provisionerMetrics = AggregateProvisionerMetrics(
- max(totalVmCount, provisionerMetrics.vmTotalCount),
- max(waitingVmCount, provisionerMetrics.vmWaitingCount),
- max(activeVmCount, provisionerMetrics.vmActiveCount),
- max(inactiveVmCount, provisionerMetrics.vmInactiveCount),
- max(failedVmCount, provisionerMetrics.vmFailedCount),
- )
- }
-
- public data class AggregateProvisionerMetrics(
- val vmTotalCount: Int = 0,
- val vmWaitingCount: Int = 0,
- val vmActiveCount: Int = 0,
- val vmInactiveCount: Int = 0,
- val vmFailedCount: Int = 0
- )
-
- override fun close() {}
-
- public fun getResult(): Result {
- return Result(
- hostAggregateMetrics.totalRequestedBurst,
- hostAggregateMetrics.totalGrantedBurst,
- hostAggregateMetrics.totalOvercommittedBurst,
- hostAggregateMetrics.totalInterferedBurst,
- hostMetrics.map { it.value.cpuUsage / it.value.count }.average(),
- hostMetrics.map { it.value.cpuDemand / it.value.count }.average(),
- hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.average(),
- hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0,
- hostAggregateMetrics.totalPowerDraw,
- hostAggregateMetrics.totalFailureSlices,
- hostAggregateMetrics.totalFailureVmSlices,
- provisionerMetrics.vmTotalCount,
- provisionerMetrics.vmWaitingCount,
- provisionerMetrics.vmInactiveCount,
- provisionerMetrics.vmFailedCount,
- )
- }
-
- public data class Result(
- public val totalRequestedBurst: Long,
- public val totalGrantedBurst: Long,
- public val totalOvercommittedBurst: Long,
- public val totalInterferedBurst: Long,
- public val meanCpuUsage: Double,
- public val meanCpuDemand: Double,
- public val meanNumDeployedImages: Double,
- public val maxNumDeployedImages: Double,
- public val totalPowerDraw: Double,
- public val totalFailureSlices: Long,
- public val totalFailureVmSlices: Long,
- public val totalVmsSubmitted: Int,
- public val totalVmsQueued: Int,
- public val totalVmsFinished: Int,
- public val totalVmsFailed: Int
- )
-}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 5cae0a31..cee8887b 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -45,6 +45,7 @@ include(":opendc-simulator:opendc-simulator-compute")
include(":opendc-simulator:opendc-simulator-failures")
include(":opendc-telemetry:opendc-telemetry-api")
include(":opendc-telemetry:opendc-telemetry-sdk")
+include(":opendc-telemetry:opendc-telemetry-compute")
include(":opendc-trace:opendc-trace-api")
include(":opendc-trace:opendc-trace-gwf")
include(":opendc-trace:opendc-trace-swf")