summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-27 16:41:55 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-07 14:34:30 +0200
commitaaedd4f3eed83d0c3ebc829fec08a1749a2bfba4 (patch)
tree3b43c8da1ab285c4b965a6042215fb694f6ee909 /opendc-experiments
parentbefec2f1ddf3a6e6d15d9d1b9fd1ecbbc4f38960 (diff)
refactor(capelin): Move metric collection outside Capelin code
This change moves the metric collection outside the Capelin codebase in a separate module so other modules can also benefit from the compute metric collection code.
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt98
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt19
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt)25
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt)40
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt73
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt)44
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt155
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt75
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt120
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt43
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt39
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt34
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt41
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt81
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt72
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt92
-rw-r--r--opendc-experiments/opendc-experiments-energy21/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt18
19 files changed, 218 insertions, 853 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 65cebe1b..1a4caf91 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -38,6 +38,7 @@ dependencies {
implementation(projects.opendcSimulator.opendcSimulatorFailures)
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(projects.opendcTelemetry.opendcTelemetryCompute)
implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 9d23a5dd..0230409e 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -24,16 +24,10 @@ package org.opendc.experiments.capelin
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostListener
-import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.ReplayScheduler
@@ -46,8 +40,6 @@ import org.opendc.compute.service.scheduler.weights.RamWeigher
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.EnvironmentReader
-import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.TraceReader
import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
@@ -57,7 +49,7 @@ import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.CorrelatedFaultInjector
import org.opendc.simulator.failures.FaultInjector
import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Clock
import kotlin.coroutines.resume
@@ -66,11 +58,6 @@ import kotlin.math.max
import kotlin.random.Random
/**
- * The logger for this experiment.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
* Construct the failure domain for the experiments.
*/
fun createFailureDomain(
@@ -169,85 +156,6 @@ suspend fun withComputeService(
}
/**
- * Attach the specified monitor to the VM provisioner.
- */
-suspend fun withMonitor(
- monitor: ExperimentMonitor,
- clock: Clock,
- metricProducer: MetricProducer,
- scheduler: ComputeService,
- block: suspend CoroutineScope.() -> Unit
-): Unit = coroutineScope {
- // Monitor host events
- for (host in scheduler.hosts) {
- monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
- host.addListener(object : HostListener {
- override fun onStateChanged(host: Host, newState: HostState) {
- monitor.reportHostStateChange(clock.millis(), host, newState)
- }
- })
- }
-
- val reader = CoroutineMetricReader(
- this,
- listOf(metricProducer),
- ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }),
- exportInterval = 5L * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */
- )
-
- try {
- block(this)
- } finally {
- reader.close()
- monitor.close()
- }
-}
-
-class ComputeMetrics {
- var submittedVms: Int = 0
- var queuedVms: Int = 0
- var runningVms: Int = 0
- var unscheduledVms: Int = 0
- var finishedVms: Int = 0
- var hosts: Int = 0
- var availableHosts = 0
-}
-
-/**
- * Collect the metrics of the compute service.
- */
-fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
- return extractComputeMetrics(metricProducer.collectAllMetrics())
-}
-
-/**
- * Extract an [ComputeMetrics] object from the specified list of metric data.
- */
-internal fun extractComputeMetrics(metrics: Collection<MetricData>): ComputeMetrics {
- val res = ComputeMetrics()
- for (metric in metrics) {
- val points = metric.longSumData.points
-
- if (points.isEmpty()) {
- continue
- }
-
- val value = points.first().value.toInt()
- when (metric.name) {
- "servers.submitted" -> res.submittedVms = value
- "servers.waiting" -> res.queuedVms = value
- "servers.unscheduled" -> res.unscheduledVms = value
- "servers.active" -> res.runningVms = value
- "servers.finished" -> res.finishedVms = value
- "hosts.total" -> res.hosts = value
- "hosts.available" -> res.availableHosts = value
- }
- }
-
- return res
-}
-
-/**
* Process the trace.
*/
suspend fun processTrace(
@@ -255,7 +163,7 @@ suspend fun processTrace(
reader: TraceReader<SimWorkload>,
scheduler: ComputeService,
chan: Channel<Unit>,
- monitor: ExperimentMonitor? = null,
+ monitor: ComputeMonitor? = null,
) {
val client = scheduler.newClient()
val image = client.newImage("vm-image")
@@ -289,7 +197,7 @@ suspend fun processTrace(
suspendCancellableCoroutine { cont ->
server.watch(object : ServerWatcher {
override fun onStateChanged(server: Server, newState: ServerState) {
- monitor?.reportVmStateChange(clock.millis(), server, newState)
+ monitor?.onStateChange(clock.millis(), server, newState)
if (newState == ServerState.TERMINATED) {
cont.resume(Unit)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index ee832af8..d3bba182 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -29,11 +29,11 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
+import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
@@ -41,6 +41,8 @@ import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.withMonitor
import java.io.File
import java.io.FileInputStream
import java.util.*
@@ -127,7 +129,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
- val monitor = ParquetExperimentMonitor(
+ val monitor = ParquetExportMonitor(
File(config.getString("output-path")),
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
@@ -148,7 +150,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
null
}
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
trace,
@@ -159,9 +161,16 @@ abstract class Portfolio(name: String) : Experiment(name) {
}
failureDomain?.cancel()
+ monitor.close()
}
- val monitorResults = collectMetrics(meterProvider as MetricProducer)
- logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
+ val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ logger.debug {
+ "Finish " +
+ "SUBMIT=${monitorResults.instanceCount} " +
+ "FAIL=${monitorResults.failedInstanceCount} " +
+ "QUEUE=${monitorResults.queuedInstanceCount} " +
+ "RUNNING=${monitorResults.activeHostCount}"
+ }
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
index 897a6692..c5cb80e2 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
@@ -20,14 +20,14 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry.parquet
+package org.opendc.experiments.capelin.export.parquet
import mu.KotlinLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.experiments.capelin.telemetry.Event
import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.Closeable
import java.io.File
@@ -36,20 +36,20 @@ import java.util.concurrent.BlockingQueue
import kotlin.concurrent.thread
/**
- * The logging instance to use.
+ * A writer that writes data in Parquet format.
*/
-private val logger = KotlinLogging.logger {}
-
-/**
- * A writer that writes events in Parquet format.
- */
-public open class ParquetEventWriter<in T : Event>(
+public open class ParquetDataWriter<in T>(
private val path: File,
private val schema: Schema,
private val converter: (T, GenericData.Record) -> Unit,
private val bufferSize: Int = 4096
) : Runnable, Closeable {
/**
+ * The logging instance to use.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
* The writer to write the Parquet file.
*/
private val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
@@ -57,6 +57,7 @@ public open class ParquetEventWriter<in T : Event>(
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(4 * 1024 * 1024) // For compression
.withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
/**
@@ -67,7 +68,7 @@ public open class ParquetEventWriter<in T : Event>(
/**
* The thread that is responsible for writing the Parquet records.
*/
- private val writerThread = thread(start = false, name = "parquet-writer") { run() }
+ private val writerThread = thread(start = false, name = this.toString()) { run() }
/**
* Write the specified metrics to the database.
@@ -100,7 +101,7 @@ public open class ParquetEventWriter<in T : Event>(
is Action.Write<*> -> {
val record = GenericData.Record(schema)
@Suppress("UNCHECKED_CAST")
- converter(action.event as T, record)
+ converter(action.data as T, record)
writer.write(record)
}
}
@@ -121,6 +122,6 @@ public open class ParquetEventWriter<in T : Event>(
/**
* Write the specified metrics to the database.
*/
- public data class Write<out T : Event>(val event: T) : Action()
+ public data class Write<out T>(val data: T) : Action()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
index c29e116e..79b84e9d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,14 +20,36 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry
+package org.opendc.experiments.capelin.export.parquet
+
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServiceData
+import java.io.File
/**
- * An event that occurs within the system.
+ * A [ComputeMonitor] that logs the events to a Parquet file.
*/
-public abstract class Event(public val name: String) {
- /**
- * The time of occurrence of this event.
- */
- public abstract val timestamp: Long
+public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
+ private val hostWriter = ParquetHostDataWriter(
+ File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+ private val serviceWriter = ParquetServiceDataWriter(
+ File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ override fun record(data: HostData) {
+ hostWriter.write(data)
+ }
+
+ override fun record(data: ServiceData) {
+ serviceWriter.write(data)
+ }
+
+ override fun close() {
+ hostWriter.close()
+ serviceWriter.close()
+ }
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
new file mode 100644
index 00000000..8912c12e
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.export.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.opendc.telemetry.compute.table.HostData
+import java.io.File
+
+/**
+ * A Parquet event writer for [HostData]s.
+ */
+public class ParquetHostDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<HostData>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "host-writer"
+
+ public companion object {
+ private val convert: (HostData, GenericData.Record) -> Unit = { data, record ->
+ record.put("host_id", data.host.name)
+ record.put("state", data.host.state.name)
+ record.put("timestamp", data.timestamp)
+ record.put("total_work", data.totalWork)
+ record.put("granted_work", data.grantedWork)
+ record.put("overcommitted_work", data.overcommittedWork)
+ record.put("interfered_work", data.interferedWork)
+ record.put("cpu_usage", data.cpuUsage)
+ record.put("cpu_demand", data.cpuDemand)
+ record.put("power_draw", data.powerDraw)
+ record.put("instance_count", data.instanceCount)
+ record.put("cores", data.host.model.cpuCount)
+ }
+
+ private val schema: Schema = SchemaBuilder
+ .record("host")
+ .namespace("org.opendc.telemetry.compute")
+ .fields()
+ .name("timestamp").type().longType().noDefault()
+ .name("host_id").type().stringType().noDefault()
+ .name("state").type().stringType().noDefault()
+ .name("requested_work").type().longType().noDefault()
+ .name("granted_work").type().longType().noDefault()
+ .name("overcommitted_work").type().longType().noDefault()
+ .name("interfered_work").type().longType().noDefault()
+ .name("cpu_usage").type().doubleType().noDefault()
+ .name("cpu_demand").type().doubleType().noDefault()
+ .name("power_draw").type().doubleType().noDefault()
+ .name("instance_count").type().intType().noDefault()
+ .name("cores").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
index 8feff8d9..36d630f3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
@@ -20,46 +20,46 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry.parquet
+package org.opendc.experiments.capelin.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
+import org.opendc.telemetry.compute.table.ServiceData
import java.io.File
/**
- * A Parquet event writer for [ProvisionerEvent]s.
+ * A Parquet event writer for [ServiceData]s.
*/
-public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) {
+public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<ServiceData>(path, schema, convert, bufferSize) {
- override fun toString(): String = "provisioner-writer"
+ override fun toString(): String = "service-writer"
public companion object {
- private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record ->
- record.put("timestamp", event.timestamp)
- record.put("host_total_count", event.totalHostCount)
- record.put("host_available_count", event.availableHostCount)
- record.put("vm_total_count", event.totalVmCount)
- record.put("vm_active_count", event.activeVmCount)
- record.put("vm_inactive_count", event.inactiveVmCount)
- record.put("vm_waiting_count", event.waitingVmCount)
- record.put("vm_failed_count", event.failedVmCount)
+ private val convert: (ServiceData, GenericData.Record) -> Unit = { data, record ->
+ record.put("timestamp", data.timestamp)
+ record.put("host_total_count", data.hostCount)
+ record.put("host_available_count", data.activeHostCount)
+ record.put("instance_total_count", data.instanceCount)
+ record.put("instance_active_count", data.runningInstanceCount)
+ record.put("instance_inactive_count", data.finishedInstanceCount)
+ record.put("instance_waiting_count", data.queuedInstanceCount)
+ record.put("instance_failed_count", data.failedInstanceCount)
}
private val schema: Schema = SchemaBuilder
- .record("provisioner_metrics")
- .namespace("org.opendc.experiments.sc20")
+ .record("service")
+ .namespace("org.opendc.telemetry.compute")
.fields()
.name("timestamp").type().longType().noDefault()
.name("host_total_count").type().intType().noDefault()
.name("host_available_count").type().intType().noDefault()
- .name("vm_total_count").type().intType().noDefault()
- .name("vm_active_count").type().intType().noDefault()
- .name("vm_inactive_count").type().intType().noDefault()
- .name("vm_waiting_count").type().intType().noDefault()
- .name("vm_failed_count").type().intType().noDefault()
+ .name("instance_total_count").type().intType().noDefault()
+ .name("instance_active_count").type().intType().noDefault()
+ .name("instance_inactive_count").type().intType().noDefault()
+ .name("instance_waiting_count").type().intType().noDefault()
+ .name("instance_failed_count").type().intType().noDefault()
.endRecord()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
deleted file mode 100644
index 79be9ac4..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.monitor
-
-import io.opentelemetry.sdk.common.CompletableResultCode
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricExporter
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
-import org.opendc.compute.service.driver.Host
-import org.opendc.experiments.capelin.extractComputeMetrics
-import java.time.Clock
-
-/**
- * A [MetricExporter] that exports the metrics to the [ExperimentMonitor].
- */
-public class ExperimentMetricExporter(
- private val monitor: ExperimentMonitor,
- private val clock: Clock,
- private val hosts: Map<String, Host>
-) : MetricExporter {
-
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- return try {
- reportHostMetrics(metrics)
- reportProvisionerMetrics(metrics)
- CompletableResultCode.ofSuccess()
- } catch (e: Throwable) {
- CompletableResultCode.ofFailure()
- }
- }
-
- private var lastHostMetrics: Map<String, HostMetrics> = emptyMap()
- private val hostMetricsSingleton = HostMetrics()
-
- private fun reportHostMetrics(metrics: Collection<MetricData>) {
- val hostMetrics = mutableMapOf<String, HostMetrics>()
-
- for (metric in metrics) {
- when (metric.name) {
- "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v }
- "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v }
- "power.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.powerDraw = v }
- "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v }
- "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v }
- "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v }
- "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v }
- "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() }
- "host.time.up" -> mapLongSum(metric, hostMetrics) { m, v -> m.uptime = v }
- "host.time.down" -> mapLongSum(metric, hostMetrics) { m, v -> m.downtime = v }
- }
- }
-
- for ((id, hostMetric) in hostMetrics) {
- val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton)
- val host = hosts[id] ?: continue
-
- monitor.reportHostData(
- clock.millis(),
- hostMetric.totalWork - lastHostMetric.totalWork,
- hostMetric.grantedWork - lastHostMetric.grantedWork,
- hostMetric.overcommittedWork - lastHostMetric.overcommittedWork,
- hostMetric.interferedWork - lastHostMetric.interferedWork,
- hostMetric.cpuUsage,
- hostMetric.cpuDemand,
- hostMetric.powerDraw,
- hostMetric.instanceCount,
- hostMetric.uptime - lastHostMetric.uptime,
- hostMetric.downtime - lastHostMetric.downtime,
- host
- )
- }
-
- lastHostMetrics = hostMetrics
- }
-
- private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
- val points = data.doubleSummaryData?.points ?: emptyList()
- for (point in points) {
- val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() }
- val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2
- block(hostMetric, avg)
- }
- }
-
- private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) {
- val points = data?.longSumData?.points ?: emptyList()
- for (point in points) {
- val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() }
- block(hostMetric, point.value)
- }
- }
-
- private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
- val points = data?.doubleSumData?.points ?: emptyList()
- for (point in points) {
- val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() }
- block(hostMetric, point.value)
- }
- }
-
- private fun reportProvisionerMetrics(metrics: Collection<MetricData>) {
- val res = extractComputeMetrics(metrics)
-
- monitor.reportServiceData(
- clock.millis(),
- res.hosts,
- res.availableHosts,
- res.submittedVms,
- res.runningVms,
- res.finishedVms,
- res.queuedVms,
- res.unscheduledVms
- )
- }
-
- private class HostMetrics {
- var totalWork: Double = 0.0
- var grantedWork: Double = 0.0
- var overcommittedWork: Double = 0.0
- var interferedWork: Double = 0.0
- var cpuUsage: Double = 0.0
- var cpuDemand: Double = 0.0
- var instanceCount: Int = 0
- var powerDraw: Double = 0.0
- var uptime: Long = 0
- var downtime: Long = 0
- }
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
deleted file mode 100644
index dc28b816..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.monitor
-
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
-
-/**
- * A monitor watches the events of an experiment.
- */
-public interface ExperimentMonitor : AutoCloseable {
- /**
- * This method is invoked when the state of a VM changes.
- */
- public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
-
- /**
- * This method is invoked when the state of a host changes.
- */
- public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {}
-
- /**
- * This method is invoked for a host for each slice that is finishes.
- */
- public fun reportHostData(
- time: Long,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- instanceCount: Int,
- uptime: Long,
- downtime: Long,
- host: Host
- ) {}
-
- /**
- * This method is invoked for reporting service data.
- */
- public fun reportServiceData(
- time: Long,
- totalHostCount: Int,
- availableHostCount: Int,
- totalVmCount: Int,
- activeVmCount: Int,
- inactiveVmCount: Int,
- waitingVmCount: Int,
- failedVmCount: Int
- ) {}
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
deleted file mode 100644
index c49499da..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.monitor
-
-import mu.KotlinLogging
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
-import org.opendc.experiments.capelin.telemetry.HostEvent
-import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
-import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter
-import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter
-import java.io.File
-
-/**
- * The logger instance to use.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * An [ExperimentMonitor] that logs the events to a Parquet file.
- */
-public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor {
- private val hostWriter = ParquetHostEventWriter(
- File(base, "host-metrics/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
- private val provisionerWriter = ParquetProvisionerEventWriter(
- File(base, "provisioner-metrics/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
-
- override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
-
- override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
- logger.debug { "Host ${host.uid} changed state $newState [$time]" }
- }
-
- override fun reportHostData(
- time: Long,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- instanceCount: Int,
- uptime: Long,
- downtime: Long,
- host: Host
- ) {
- hostWriter.write(
- HostEvent(
- time,
- 5 * 60 * 1000L,
- host,
- instanceCount,
- totalWork.toLong(),
- grantedWork.toLong(),
- overcommittedWork.toLong(),
- interferedWork.toLong(),
- cpuUsage,
- cpuDemand,
- powerDraw,
- host.model.cpuCount
- )
- )
- }
-
- override fun reportServiceData(
- time: Long,
- totalHostCount: Int,
- availableHostCount: Int,
- totalVmCount: Int,
- activeVmCount: Int,
- inactiveVmCount: Int,
- waitingVmCount: Int,
- failedVmCount: Int
- ) {
- provisionerWriter.write(
- ProvisionerEvent(
- time,
- totalHostCount,
- availableHostCount,
- totalVmCount,
- activeVmCount,
- inactiveVmCount,
- waitingVmCount,
- failedVmCount
- )
- )
- }
-
- override fun close() {
- hostWriter.close()
- provisionerWriter.close()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
deleted file mode 100644
index 899fc9b1..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry
-
-import org.opendc.compute.service.driver.Host
-
-/**
- * A periodic report of the host machine metrics.
- */
-public data class HostEvent(
- override val timestamp: Long,
- public val duration: Long,
- public val host: Host,
- public val vmCount: Int,
- public val requestedBurst: Long,
- public val grantedBurst: Long,
- public val overcommissionedBurst: Long,
- public val interferedBurst: Long,
- public val cpuUsage: Double,
- public val cpuDemand: Double,
- public val powerDraw: Double,
- public val cores: Int
-) : Event("host-metrics")
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt
deleted file mode 100644
index 539c9bc9..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry
-
-/**
- * A periodic report of the provisioner's metrics.
- */
-public data class ProvisionerEvent(
- override val timestamp: Long,
- public val totalHostCount: Int,
- public val availableHostCount: Int,
- public val totalVmCount: Int,
- public val activeVmCount: Int,
- public val inactiveVmCount: Int,
- public val waitingVmCount: Int,
- public val failedVmCount: Int
-) : Event("provisioner-metrics")
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt
deleted file mode 100644
index 6c8fc941..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry
-
-import org.opendc.experiments.capelin.Portfolio
-
-/**
- * A periodic report of the host machine metrics.
- */
-public data class RunEvent(
- val portfolio: Portfolio,
- val repeat: Int,
- override val timestamp: Long
-) : Event("run")
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
deleted file mode 100644
index 7631f55f..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry
-
-import org.opendc.compute.api.Server
-
-/**
- * A periodic report of a virtual machine's metrics.
- */
-public data class VmEvent(
- override val timestamp: Long,
- public val duration: Long,
- public val vm: Server,
- public val host: Server,
- public val requestedBurst: Long,
- public val grantedBurst: Long,
- public val overcommissionedBurst: Long,
- public val interferedBurst: Long,
- public val cpuUsage: Double,
- public val cpuDemand: Double
-) : Event("vm-metrics")
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
deleted file mode 100644
index c8fe1cb2..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.HostEvent
-import java.io.File
-
-/**
- * A Parquet event writer for [HostEvent]s.
- */
-public class ParquetHostEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) {
-
- override fun toString(): String = "host-writer"
-
- public companion object {
- private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record ->
- // record.put("portfolio_id", event.run.parent.parent.id)
- // record.put("scenario_id", event.run.parent.id)
- // record.put("run_id", event.run.id)
- record.put("host_id", event.host.name)
- record.put("state", event.host.state.name)
- record.put("timestamp", event.timestamp)
- record.put("duration", event.duration)
- record.put("vm_count", event.vmCount)
- record.put("requested_burst", event.requestedBurst)
- record.put("granted_burst", event.grantedBurst)
- record.put("overcommissioned_burst", event.overcommissionedBurst)
- record.put("interfered_burst", event.interferedBurst)
- record.put("cpu_usage", event.cpuUsage)
- record.put("cpu_demand", event.cpuDemand)
- record.put("power_draw", event.powerDraw)
- record.put("cores", event.cores)
- }
-
- private val schema: Schema = SchemaBuilder
- .record("host_metrics")
- .namespace("org.opendc.experiments.sc20")
- .fields()
- // .name("portfolio_id").type().intType().noDefault()
- // .name("scenario_id").type().intType().noDefault()
- // .name("run_id").type().intType().noDefault()
- .name("timestamp").type().longType().noDefault()
- .name("duration").type().longType().noDefault()
- .name("host_id").type().stringType().noDefault()
- .name("state").type().stringType().noDefault()
- .name("vm_count").type().intType().noDefault()
- .name("requested_burst").type().longType().noDefault()
- .name("granted_burst").type().longType().noDefault()
- .name("overcommissioned_burst").type().longType().noDefault()
- .name("interfered_burst").type().longType().noDefault()
- .name("cpu_usage").type().doubleType().noDefault()
- .name("cpu_demand").type().doubleType().noDefault()
- .name("power_draw").type().doubleType().noDefault()
- .name("cores").type().intType().noDefault()
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt
deleted file mode 100644
index 946410eb..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.RunEvent
-import java.io.File
-
-/**
- * A Parquet event writer for [RunEvent]s.
- */
-public class ParquetRunEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) {
-
- override fun toString(): String = "run-writer"
-
- public companion object {
- private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record ->
- val portfolio = event.portfolio
- record.put("portfolio_name", portfolio.name)
- record.put("scenario_id", portfolio.id)
- record.put("run_id", event.repeat)
- record.put("topology", portfolio.topology.name)
- record.put("workload_name", portfolio.workload.name)
- record.put("workload_fraction", portfolio.workload.fraction)
- record.put("workload_sampler", portfolio.workload.samplingStrategy)
- record.put("allocation_policy", portfolio.allocationPolicy)
- record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency)
- record.put("interference", portfolio.operationalPhenomena.hasInterference)
- record.put("seed", event.repeat)
- }
-
- private val schema: Schema = SchemaBuilder
- .record("runs")
- .namespace("org.opendc.experiments.sc20")
- .fields()
- .name("portfolio_name").type().stringType().noDefault()
- .name("scenario_id").type().intType().noDefault()
- .name("run_id").type().intType().noDefault()
- .name("topology").type().stringType().noDefault()
- .name("workload_name").type().stringType().noDefault()
- .name("workload_fraction").type().doubleType().noDefault()
- .name("workload_sampler").type().stringType().noDefault()
- .name("allocation_policy").type().stringType().noDefault()
- .name("failure_frequency").type().doubleType().noDefault()
- .name("interference").type().booleanType().noDefault()
- .name("seed").type().intType().noDefault()
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 24d8f768..abd8efeb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -29,7 +29,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
@@ -38,7 +37,6 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
@@ -46,6 +44,10 @@ import org.opendc.experiments.capelin.trace.TraceReader
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.withMonitor
import java.io.File
import java.util.*
@@ -80,7 +82,6 @@ class CapelinIntegrationTest {
)
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- lateinit var monitorResults: ComputeMetrics
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
@@ -98,7 +99,7 @@ class CapelinIntegrationTest {
null
}
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -111,15 +112,21 @@ class CapelinIntegrationTest {
failureDomain?.cancel()
}
- monitorResults = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") },
- { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
- { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
- { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
+ { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") },
+ { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") },
+ { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") },
+ { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") },
{ assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } },
{ assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } },
{ assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
@@ -145,7 +152,7 @@ class CapelinIntegrationTest {
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -156,8 +163,14 @@ class CapelinIntegrationTest {
}
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
@@ -189,7 +202,7 @@ class CapelinIntegrationTest {
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -200,8 +213,14 @@ class CapelinIntegrationTest {
}
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
@@ -239,7 +258,7 @@ class CapelinIntegrationTest {
chan
)
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -252,8 +271,14 @@ class CapelinIntegrationTest {
failureDomain.cancel()
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
@@ -283,34 +308,19 @@ class CapelinIntegrationTest {
return ClusterEnvironmentReader(stream)
}
- class TestExperimentReporter : ExperimentMonitor {
+ class TestExperimentReporter : ComputeMonitor {
var totalWork = 0L
var totalGrantedWork = 0L
var totalOvercommittedWork = 0L
var totalInterferedWork = 0L
var totalPowerDraw = 0.0
- override fun reportHostData(
- time: Long,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- instanceCount: Int,
- uptime: Long,
- downtime: Long,
- host: Host,
- ) {
- this.totalWork += totalWork.toLong()
- totalGrantedWork += grantedWork.toLong()
- totalOvercommittedWork += overcommittedWork.toLong()
- totalInterferedWork += interferedWork.toLong()
- totalPowerDraw += powerDraw
+ override fun record(data: HostData) {
+ this.totalWork += data.totalWork.toLong()
+ totalGrantedWork += data.grantedWork.toLong()
+ totalOvercommittedWork += data.overcommittedWork.toLong()
+ totalInterferedWork += data.interferedWork.toLong()
+ totalPowerDraw += data.powerDraw
}
-
- override fun close() {}
}
}
diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
index 7d34d098..40ac2967 100644
--- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
@@ -37,6 +37,7 @@ dependencies {
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcExperiments.opendcExperimentsCapelin)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(projects.opendcTelemetry.opendcTelemetryCompute)
implementation(libs.kotlin.logging)
implementation(libs.config)
diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
index e64e20a2..02aaab3c 100644
--- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
+++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
@@ -37,7 +37,7 @@ import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.*
-import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
+import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
@@ -50,6 +50,8 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.*
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.withMonitor
import java.io.File
import java.time.Clock
import java.util.*
@@ -87,11 +89,11 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
)
val meterProvider: MeterProvider = createMeterProvider(clock)
- val monitor = ParquetExperimentMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096)
+ val monitor = ParquetExportMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096)
val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace))
withComputeService(clock, meterProvider, allocationPolicy) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
trace,
@@ -102,12 +104,12 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
}
}
- val monitorResults = collectMetrics(meterProvider as MetricProducer)
+ val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
logger.debug {
- "Finish SUBMIT=${monitorResults.submittedVms} " +
- "FAIL=${monitorResults.unscheduledVms} " +
- "QUEUE=${monitorResults.queuedVms} " +
- "RUNNING=${monitorResults.runningVms}"
+ "Finish SUBMIT=${monitorResults.instanceCount} " +
+ "FAIL=${monitorResults.failedInstanceCount} " +
+ "QUEUE=${monitorResults.queuedInstanceCount} " +
+ "RUNNING=${monitorResults.runningInstanceCount}"
}
}