diff options
Diffstat (limited to 'opendc-compute/opendc-compute-simulator')
5 files changed, 196 insertions, 76 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt index 29e9d541..cec664b6 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt @@ -25,6 +25,7 @@ package org.opendc.compute.simulator.provisioner import org.opendc.compute.simulator.service.ComputeService import org.opendc.compute.simulator.telemetry.ComputeMetricReader import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.simulator.telemetry.OutputFiles import java.time.Duration /** @@ -36,6 +37,14 @@ public class ComputeMonitorProvisioningStep( private val monitor: ComputeMonitor, private val exportInterval: Duration, private val startTime: Duration = Duration.ofMillis(0), + private val filesToExport: Map<OutputFiles, Boolean> = + mapOf( + OutputFiles.HOST to true, + OutputFiles.TASK to true, + OutputFiles.SERVICE to true, + OutputFiles.POWER_SOURCE to true, + OutputFiles.BATTERY to true, + ), ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { val service = @@ -49,6 +58,7 @@ public class ComputeMonitorProvisioningStep( monitor, exportInterval, startTime, + filesToExport, ) return metricReader } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt index 7d9cae60..c72e8944 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt @@ -26,6 +26,7 @@ package org.opendc.compute.simulator.provisioner import org.opendc.compute.simulator.scheduler.ComputeScheduler import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.simulator.telemetry.OutputFiles import org.opendc.compute.topology.specs.ClusterSpec import org.opendc.compute.topology.specs.HostSpec import java.time.Duration @@ -59,8 +60,16 @@ public fun registerComputeMonitor( monitor: ComputeMonitor, exportInterval: Duration = Duration.ofMinutes(5), startTime: Duration = Duration.ofMillis(0), + filesToExport: Map<OutputFiles, Boolean> = + mapOf( + OutputFiles.HOST to true, + OutputFiles.TASK to true, + OutputFiles.SERVICE to true, + OutputFiles.POWER_SOURCE to true, + OutputFiles.BATTERY to true, + ), ): ProvisioningStep { - return ComputeMonitorProvisioningStep(serviceDomain, monitor, exportInterval, startTime) + return ComputeMonitorProvisioningStep(serviceDomain, monitor, exportInterval, startTime, filesToExport) } /** diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt index 10bc889b..91748454 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt @@ -56,6 +56,14 @@ public class ComputeMetricReader( private val monitor: ComputeMonitor, private val exportInterval: Duration = Duration.ofMinutes(5), private val startTime: Duration = Duration.ofMillis(0), + private val toMonitor: Map<OutputFiles, Boolean> = + mapOf( + OutputFiles.HOST to true, + OutputFiles.TASK to true, + OutputFiles.POWER_SOURCE to true, + OutputFiles.BATTERY to true, + OutputFiles.SERVICE to true, + ), ) : AutoCloseable { private val logger = KotlinLogging.logger {} private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher()) @@ -116,31 +124,35 @@ public class ComputeMetricReader( try { val now = this.clock.instant() - for (host in this.service.hosts) { - val reader = - this.hostTableReaders.computeIfAbsent(host) { - HostTableReaderImpl( - it, - startTime, - ) - } - reader.record(now) - this.monitor.record(reader.copy()) - reader.reset() + if (toMonitor[OutputFiles.HOST] == true) { + for (host in this.service.hosts) { + val reader = + this.hostTableReaders.computeIfAbsent(host) { + HostTableReaderImpl( + it, + startTime, + ) + } + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } } - for (task in this.service.tasks) { - val reader = - this.taskTableReaders.computeIfAbsent(task) { - TaskTableReaderImpl( - service, - it, - startTime, - ) - } - reader.record(now) - this.monitor.record(reader.copy()) - reader.reset() + if (toMonitor[OutputFiles.TASK] == true) { + for (task in this.service.tasks) { + val reader = + this.taskTableReaders.computeIfAbsent(task) { + TaskTableReaderImpl( + service, + it, + startTime, + ) + } + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } } for (task in this.service.tasksToRemove) { @@ -148,36 +160,42 @@ public class ComputeMetricReader( } this.service.clearTasksToRemove() - for (simPowerSource in this.service.powerSources) { - val reader = - this.powerSourceTableReaders.computeIfAbsent(simPowerSource) { - PowerSourceTableReaderImpl( - it, - startTime, - ) - } - - reader.record(now) - this.monitor.record(reader.copy()) - reader.reset() + if (toMonitor[OutputFiles.POWER_SOURCE] == true) { + for (simPowerSource in this.service.powerSources) { + val reader = + this.powerSourceTableReaders.computeIfAbsent(simPowerSource) { + PowerSourceTableReaderImpl( + it, + startTime, + ) + } + + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } } - for (simBattery in this.service.batteries) { - val reader = - this.batteryTableReaders.computeIfAbsent(simBattery) { - BatteryTableReaderImpl( - it, - startTime, - ) - } - - reader.record(now) - this.monitor.record(reader.copy()) - reader.reset() + if (toMonitor[OutputFiles.BATTERY] == true) { + for (simBattery in this.service.batteries) { + val reader = + this.batteryTableReaders.computeIfAbsent(simBattery) { + BatteryTableReaderImpl( + it, + startTime, + ) + } + + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } } - this.serviceTableReader.record(now) - monitor.record(this.serviceTableReader.copy()) + if (toMonitor[OutputFiles.SERVICE] == true) { + this.serviceTableReader.record(now) + monitor.record(this.serviceTableReader.copy()) + } if (loggCounter >= 100) { var loggString = "\n\t\t\t\t\tMetrics after ${now.toEpochMilli() / 1000 / 60 / 60} hours:\n" diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/OutputFiles.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/OutputFiles.kt new file mode 100644 index 00000000..6b747a94 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/OutputFiles.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry + +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable + +@Serializable +public enum class OutputFiles { + @SerialName("host") + HOST, + + @SerialName("task") + TASK, + + @SerialName("powerSource") + POWER_SOURCE, + + @SerialName("battery") + BATTERY, + + @SerialName("service") + SERVICE, +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt index 7d2b9363..a626c41b 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt @@ -23,6 +23,7 @@ package org.opendc.compute.simulator.telemetry.parquet import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.simulator.telemetry.OutputFiles import org.opendc.compute.simulator.telemetry.table.battery.BatteryTableReader import org.opendc.compute.simulator.telemetry.table.host.HostTableReader import org.opendc.compute.simulator.telemetry.table.powerSource.PowerSourceTableReader @@ -37,38 +38,38 @@ import java.io.File * A [ComputeMonitor] that logs the events to a Parquet file. */ public class ParquetComputeMonitor( - private val hostExporter: Exporter<HostTableReader>, - private val taskExporter: Exporter<TaskTableReader>, - private val powerSourceExporter: Exporter<PowerSourceTableReader>, - private val batteryExporter: Exporter<BatteryTableReader>, - private val serviceExporter: Exporter<ServiceTableReader>, + private val hostExporter: Exporter<HostTableReader>?, + private val taskExporter: Exporter<TaskTableReader>?, + private val powerSourceExporter: Exporter<PowerSourceTableReader>?, + private val batteryExporter: Exporter<BatteryTableReader>?, + private val serviceExporter: Exporter<ServiceTableReader>?, ) : ComputeMonitor, AutoCloseable { override fun record(reader: HostTableReader) { - hostExporter.write(reader) + hostExporter?.write(reader) } override fun record(reader: TaskTableReader) { - taskExporter.write(reader) + taskExporter?.write(reader) } override fun record(reader: PowerSourceTableReader) { - powerSourceExporter.write(reader) + powerSourceExporter?.write(reader) } override fun record(reader: BatteryTableReader) { - batteryExporter.write(reader) + batteryExporter?.write(reader) } override fun record(reader: ServiceTableReader) { - serviceExporter.write(reader) + serviceExporter?.write(reader) } override fun close() { - hostExporter.close() - taskExporter.close() - powerSourceExporter.close() - batteryExporter.close() - serviceExporter.close() + hostExporter?.close() + taskExporter?.close() + powerSourceExporter?.close() + batteryExporter?.close() + serviceExporter?.close() } public companion object { @@ -83,12 +84,14 @@ public class ParquetComputeMonitor( base: File, partition: String, bufferSize: Int, + filesToExport: Map<OutputFiles, Boolean>, computeExportConfig: ComputeExportConfig, ): ParquetComputeMonitor = invoke( base = base, partition = partition, bufferSize = bufferSize, + filesToExport = filesToExport, hostExportColumns = computeExportConfig.hostExportColumns, taskExportColumns = computeExportConfig.taskExportColumns, powerSourceExportColumns = computeExportConfig.powerSourceExportColumns, @@ -109,6 +112,7 @@ public class ParquetComputeMonitor( base: File, partition: String, bufferSize: Int, + filesToExport: Map<OutputFiles, Boolean>, hostExportColumns: Collection<ExportColumn<HostTableReader>>? = null, taskExportColumns: Collection<ExportColumn<TaskTableReader>>? = null, powerSourceExportColumns: Collection<ExportColumn<PowerSourceTableReader>>? = null, @@ -118,37 +122,72 @@ public class ParquetComputeMonitor( // Loads the fields in case they need to be retrieved if optional params are omitted. ComputeExportConfig.loadDfltColumns() - return ParquetComputeMonitor( - hostExporter = + val hostExporter = + if (filesToExport[OutputFiles.HOST] == true) { Exporter( outputFile = File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() }, columns = hostExportColumns ?: Exportable.getAllLoadedColumns(), bufferSize = bufferSize, - ), - taskExporter = + ) + } else { + null + } + + val taskExporter = + if (filesToExport[OutputFiles.TASK] == true) { Exporter( outputFile = File(base, "$partition/task.parquet").also { it.parentFile.mkdirs() }, columns = taskExportColumns ?: Exportable.getAllLoadedColumns(), bufferSize = bufferSize, - ), - powerSourceExporter = + ) + } else { + null + } + + val powerSourceExporter = + if (filesToExport[OutputFiles.POWER_SOURCE] == true) { Exporter( outputFile = File(base, "$partition/powerSource.parquet").also { it.parentFile.mkdirs() }, columns = powerSourceExportColumns ?: Exportable.getAllLoadedColumns(), bufferSize = bufferSize, - ), - batteryExporter = + ) + } else { + null + } + + val batteryExporter = + if (filesToExport[OutputFiles.BATTERY] == true) { Exporter( outputFile = File(base, "$partition/battery.parquet").also { it.parentFile.mkdirs() }, columns = batteryExportColumns ?: Exportable.getAllLoadedColumns(), bufferSize = bufferSize, - ), - serviceExporter = + ) + } else { + null + } + + val serviceExporter = + if (filesToExport[OutputFiles.SERVICE] == true) { Exporter( outputFile = File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() }, columns = serviceExportColumns ?: Exportable.getAllLoadedColumns(), bufferSize = bufferSize, - ), + ) + } else { + null + } + + return ParquetComputeMonitor( + hostExporter = + hostExporter, + taskExporter = + taskExporter, + powerSourceExporter = + powerSourceExporter, + batteryExporter = + batteryExporter, + serviceExporter = + serviceExporter, ) } } |
