summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2025-02-18 11:58:56 +0100
committerGitHub <noreply@github.com>2025-02-18 11:58:56 +0100
commitee770ffe22e1eb2d7f28c010d6252222d2f4bc1a (patch)
treedf0305f888bea04c6593ccdb99f7402d6040b486 /opendc-compute
parente9080b6280a3a1264a35748eccd1c58205c001bf (diff)
Added the option to select which files to export. (#307)
* Added the option to select which files to export. * Updated documentation
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt10
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt11
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt116
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/OutputFiles.kt44
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt91
5 files changed, 196 insertions, 76 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt
index 29e9d541..cec664b6 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt
@@ -25,6 +25,7 @@ package org.opendc.compute.simulator.provisioner
import org.opendc.compute.simulator.service.ComputeService
import org.opendc.compute.simulator.telemetry.ComputeMetricReader
import org.opendc.compute.simulator.telemetry.ComputeMonitor
+import org.opendc.compute.simulator.telemetry.OutputFiles
import java.time.Duration
/**
@@ -36,6 +37,14 @@ public class ComputeMonitorProvisioningStep(
private val monitor: ComputeMonitor,
private val exportInterval: Duration,
private val startTime: Duration = Duration.ofMillis(0),
+ private val filesToExport: Map<OutputFiles, Boolean> =
+ mapOf(
+ OutputFiles.HOST to true,
+ OutputFiles.TASK to true,
+ OutputFiles.SERVICE to true,
+ OutputFiles.POWER_SOURCE to true,
+ OutputFiles.BATTERY to true,
+ ),
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
val service =
@@ -49,6 +58,7 @@ public class ComputeMonitorProvisioningStep(
monitor,
exportInterval,
startTime,
+ filesToExport,
)
return metricReader
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt
index 7d9cae60..c72e8944 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt
@@ -26,6 +26,7 @@ package org.opendc.compute.simulator.provisioner
import org.opendc.compute.simulator.scheduler.ComputeScheduler
import org.opendc.compute.simulator.telemetry.ComputeMonitor
+import org.opendc.compute.simulator.telemetry.OutputFiles
import org.opendc.compute.topology.specs.ClusterSpec
import org.opendc.compute.topology.specs.HostSpec
import java.time.Duration
@@ -59,8 +60,16 @@ public fun registerComputeMonitor(
monitor: ComputeMonitor,
exportInterval: Duration = Duration.ofMinutes(5),
startTime: Duration = Duration.ofMillis(0),
+ filesToExport: Map<OutputFiles, Boolean> =
+ mapOf(
+ OutputFiles.HOST to true,
+ OutputFiles.TASK to true,
+ OutputFiles.SERVICE to true,
+ OutputFiles.POWER_SOURCE to true,
+ OutputFiles.BATTERY to true,
+ ),
): ProvisioningStep {
- return ComputeMonitorProvisioningStep(serviceDomain, monitor, exportInterval, startTime)
+ return ComputeMonitorProvisioningStep(serviceDomain, monitor, exportInterval, startTime, filesToExport)
}
/**
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt
index 10bc889b..91748454 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt
@@ -56,6 +56,14 @@ public class ComputeMetricReader(
private val monitor: ComputeMonitor,
private val exportInterval: Duration = Duration.ofMinutes(5),
private val startTime: Duration = Duration.ofMillis(0),
+ private val toMonitor: Map<OutputFiles, Boolean> =
+ mapOf(
+ OutputFiles.HOST to true,
+ OutputFiles.TASK to true,
+ OutputFiles.POWER_SOURCE to true,
+ OutputFiles.BATTERY to true,
+ OutputFiles.SERVICE to true,
+ ),
) : AutoCloseable {
private val logger = KotlinLogging.logger {}
private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher())
@@ -116,31 +124,35 @@ public class ComputeMetricReader(
try {
val now = this.clock.instant()
- for (host in this.service.hosts) {
- val reader =
- this.hostTableReaders.computeIfAbsent(host) {
- HostTableReaderImpl(
- it,
- startTime,
- )
- }
- reader.record(now)
- this.monitor.record(reader.copy())
- reader.reset()
+ if (toMonitor[OutputFiles.HOST] == true) {
+ for (host in this.service.hosts) {
+ val reader =
+ this.hostTableReaders.computeIfAbsent(host) {
+ HostTableReaderImpl(
+ it,
+ startTime,
+ )
+ }
+ reader.record(now)
+ this.monitor.record(reader.copy())
+ reader.reset()
+ }
}
- for (task in this.service.tasks) {
- val reader =
- this.taskTableReaders.computeIfAbsent(task) {
- TaskTableReaderImpl(
- service,
- it,
- startTime,
- )
- }
- reader.record(now)
- this.monitor.record(reader.copy())
- reader.reset()
+ if (toMonitor[OutputFiles.TASK] == true) {
+ for (task in this.service.tasks) {
+ val reader =
+ this.taskTableReaders.computeIfAbsent(task) {
+ TaskTableReaderImpl(
+ service,
+ it,
+ startTime,
+ )
+ }
+ reader.record(now)
+ this.monitor.record(reader.copy())
+ reader.reset()
+ }
}
for (task in this.service.tasksToRemove) {
@@ -148,36 +160,42 @@ public class ComputeMetricReader(
}
this.service.clearTasksToRemove()
- for (simPowerSource in this.service.powerSources) {
- val reader =
- this.powerSourceTableReaders.computeIfAbsent(simPowerSource) {
- PowerSourceTableReaderImpl(
- it,
- startTime,
- )
- }
-
- reader.record(now)
- this.monitor.record(reader.copy())
- reader.reset()
+ if (toMonitor[OutputFiles.POWER_SOURCE] == true) {
+ for (simPowerSource in this.service.powerSources) {
+ val reader =
+ this.powerSourceTableReaders.computeIfAbsent(simPowerSource) {
+ PowerSourceTableReaderImpl(
+ it,
+ startTime,
+ )
+ }
+
+ reader.record(now)
+ this.monitor.record(reader.copy())
+ reader.reset()
+ }
}
- for (simBattery in this.service.batteries) {
- val reader =
- this.batteryTableReaders.computeIfAbsent(simBattery) {
- BatteryTableReaderImpl(
- it,
- startTime,
- )
- }
-
- reader.record(now)
- this.monitor.record(reader.copy())
- reader.reset()
+ if (toMonitor[OutputFiles.BATTERY] == true) {
+ for (simBattery in this.service.batteries) {
+ val reader =
+ this.batteryTableReaders.computeIfAbsent(simBattery) {
+ BatteryTableReaderImpl(
+ it,
+ startTime,
+ )
+ }
+
+ reader.record(now)
+ this.monitor.record(reader.copy())
+ reader.reset()
+ }
}
- this.serviceTableReader.record(now)
- monitor.record(this.serviceTableReader.copy())
+ if (toMonitor[OutputFiles.SERVICE] == true) {
+ this.serviceTableReader.record(now)
+ monitor.record(this.serviceTableReader.copy())
+ }
if (loggCounter >= 100) {
var loggString = "\n\t\t\t\t\tMetrics after ${now.toEpochMilli() / 1000 / 60 / 60} hours:\n"
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/OutputFiles.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/OutputFiles.kt
new file mode 100644
index 00000000..6b747a94
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/OutputFiles.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.telemetry
+
+import kotlinx.serialization.SerialName
+import kotlinx.serialization.Serializable
+
+@Serializable
+public enum class OutputFiles {
+ @SerialName("host")
+ HOST,
+
+ @SerialName("task")
+ TASK,
+
+ @SerialName("powerSource")
+ POWER_SOURCE,
+
+ @SerialName("battery")
+ BATTERY,
+
+ @SerialName("service")
+ SERVICE,
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt
index 7d2b9363..a626c41b 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt
@@ -23,6 +23,7 @@
package org.opendc.compute.simulator.telemetry.parquet
import org.opendc.compute.simulator.telemetry.ComputeMonitor
+import org.opendc.compute.simulator.telemetry.OutputFiles
import org.opendc.compute.simulator.telemetry.table.battery.BatteryTableReader
import org.opendc.compute.simulator.telemetry.table.host.HostTableReader
import org.opendc.compute.simulator.telemetry.table.powerSource.PowerSourceTableReader
@@ -37,38 +38,38 @@ import java.io.File
* A [ComputeMonitor] that logs the events to a Parquet file.
*/
public class ParquetComputeMonitor(
- private val hostExporter: Exporter<HostTableReader>,
- private val taskExporter: Exporter<TaskTableReader>,
- private val powerSourceExporter: Exporter<PowerSourceTableReader>,
- private val batteryExporter: Exporter<BatteryTableReader>,
- private val serviceExporter: Exporter<ServiceTableReader>,
+ private val hostExporter: Exporter<HostTableReader>?,
+ private val taskExporter: Exporter<TaskTableReader>?,
+ private val powerSourceExporter: Exporter<PowerSourceTableReader>?,
+ private val batteryExporter: Exporter<BatteryTableReader>?,
+ private val serviceExporter: Exporter<ServiceTableReader>?,
) : ComputeMonitor, AutoCloseable {
override fun record(reader: HostTableReader) {
- hostExporter.write(reader)
+ hostExporter?.write(reader)
}
override fun record(reader: TaskTableReader) {
- taskExporter.write(reader)
+ taskExporter?.write(reader)
}
override fun record(reader: PowerSourceTableReader) {
- powerSourceExporter.write(reader)
+ powerSourceExporter?.write(reader)
}
override fun record(reader: BatteryTableReader) {
- batteryExporter.write(reader)
+ batteryExporter?.write(reader)
}
override fun record(reader: ServiceTableReader) {
- serviceExporter.write(reader)
+ serviceExporter?.write(reader)
}
override fun close() {
- hostExporter.close()
- taskExporter.close()
- powerSourceExporter.close()
- batteryExporter.close()
- serviceExporter.close()
+ hostExporter?.close()
+ taskExporter?.close()
+ powerSourceExporter?.close()
+ batteryExporter?.close()
+ serviceExporter?.close()
}
public companion object {
@@ -83,12 +84,14 @@ public class ParquetComputeMonitor(
base: File,
partition: String,
bufferSize: Int,
+ filesToExport: Map<OutputFiles, Boolean>,
computeExportConfig: ComputeExportConfig,
): ParquetComputeMonitor =
invoke(
base = base,
partition = partition,
bufferSize = bufferSize,
+ filesToExport = filesToExport,
hostExportColumns = computeExportConfig.hostExportColumns,
taskExportColumns = computeExportConfig.taskExportColumns,
powerSourceExportColumns = computeExportConfig.powerSourceExportColumns,
@@ -109,6 +112,7 @@ public class ParquetComputeMonitor(
base: File,
partition: String,
bufferSize: Int,
+ filesToExport: Map<OutputFiles, Boolean>,
hostExportColumns: Collection<ExportColumn<HostTableReader>>? = null,
taskExportColumns: Collection<ExportColumn<TaskTableReader>>? = null,
powerSourceExportColumns: Collection<ExportColumn<PowerSourceTableReader>>? = null,
@@ -118,37 +122,72 @@ public class ParquetComputeMonitor(
// Loads the fields in case they need to be retrieved if optional params are omitted.
ComputeExportConfig.loadDfltColumns()
- return ParquetComputeMonitor(
- hostExporter =
+ val hostExporter =
+ if (filesToExport[OutputFiles.HOST] == true) {
Exporter(
outputFile = File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() },
columns = hostExportColumns ?: Exportable.getAllLoadedColumns(),
bufferSize = bufferSize,
- ),
- taskExporter =
+ )
+ } else {
+ null
+ }
+
+ val taskExporter =
+ if (filesToExport[OutputFiles.TASK] == true) {
Exporter(
outputFile = File(base, "$partition/task.parquet").also { it.parentFile.mkdirs() },
columns = taskExportColumns ?: Exportable.getAllLoadedColumns(),
bufferSize = bufferSize,
- ),
- powerSourceExporter =
+ )
+ } else {
+ null
+ }
+
+ val powerSourceExporter =
+ if (filesToExport[OutputFiles.POWER_SOURCE] == true) {
Exporter(
outputFile = File(base, "$partition/powerSource.parquet").also { it.parentFile.mkdirs() },
columns = powerSourceExportColumns ?: Exportable.getAllLoadedColumns(),
bufferSize = bufferSize,
- ),
- batteryExporter =
+ )
+ } else {
+ null
+ }
+
+ val batteryExporter =
+ if (filesToExport[OutputFiles.BATTERY] == true) {
Exporter(
outputFile = File(base, "$partition/battery.parquet").also { it.parentFile.mkdirs() },
columns = batteryExportColumns ?: Exportable.getAllLoadedColumns(),
bufferSize = bufferSize,
- ),
- serviceExporter =
+ )
+ } else {
+ null
+ }
+
+ val serviceExporter =
+ if (filesToExport[OutputFiles.SERVICE] == true) {
Exporter(
outputFile = File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() },
columns = serviceExportColumns ?: Exportable.getAllLoadedColumns(),
bufferSize = bufferSize,
- ),
+ )
+ } else {
+ null
+ }
+
+ return ParquetComputeMonitor(
+ hostExporter =
+ hostExporter,
+ taskExporter =
+ taskExporter,
+ powerSourceExporter =
+ powerSourceExporter,
+ batteryExporter =
+ batteryExporter,
+ serviceExporter =
+ serviceExporter,
)
}
}