diff options
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src/main')
5 files changed, 44 insertions, 144 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt index 799de60f..5f8002e2 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -89,11 +89,10 @@ public class ExperimentMetricExporter( hostMetric.interferedBurst, hostMetric.cpuUsage, hostMetric.cpuDemand, + hostMetric.powerDraw, hostMetric.numberOfDeployedImages, host ) - - monitor.reportPowerConsumption(host, hostMetric.powerDraw) } } @@ -104,19 +103,9 @@ public class ExperimentMetricExporter( val hostMetric = hostMetrics[uid] if (hostMetric != null) { - block(hostMetric, point.sum) - } - } - } - - private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { - val points = data?.doubleSumData?.points ?: emptyList() - for (point in points) { - val uid = point.labels["host"] - val hostMetric = hostMetrics[uid] - - if (hostMetric != null) { - block(hostMetric, point.value) + // Take the average of the summary + val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2 + block(hostMetric, avg) } } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 5e75c890..68631dee 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -42,11 +42,6 @@ public interface ExperimentMonitor : AutoCloseable { public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {} /** - * Report the power consumption of a host. - */ - public fun reportPowerConsumption(host: Host, draw: Double) {} - - /** * This method is invoked for a host for each slice that is finishes. */ public fun reportHostSlice( @@ -57,9 +52,9 @@ public interface ExperimentMonitor : AutoCloseable { interferedBurst: Long, cpuUsage: Double, cpuDemand: Double, + powerDraw: Double, numberOfDeployedImages: Int, - host: Host, - duration: Long = 5 * 60 * 1000L + host: Host ) { } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index 0e675d87..983b4cff 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -50,52 +50,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: File(base, "provisioner-metrics/$partition/data.parquet"), bufferSize ) - private val currentHostEvent = mutableMapOf<Host, HostEvent>() - private var startTime = -1L - override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) { - if (startTime < 0) { - startTime = time - - // Update timestamp of initial event - currentHostEvent.replaceAll { _, v -> v.copy(timestamp = startTime) } - } - } + override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} override fun reportHostStateChange(time: Long, host: Host, newState: HostState) { logger.debug { "Host ${host.uid} changed state $newState [$time]" } - - val previousEvent = currentHostEvent[host] - - val roundedTime = previousEvent?.let { - val duration = time - it.timestamp - val k = 5 * 60 * 1000L // 5 min in ms - val rem = duration % k - - if (rem == 0L) { - time - } else { - it.timestamp + duration + k - rem - } - } ?: time - - reportHostSlice( - roundedTime, - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - host - ) - } - - private val lastPowerConsumption = mutableMapOf<Host, Double>() - - override fun reportPowerConsumption(host: Host, draw: Double) { - lastPowerConsumption[host] = draw } override fun reportHostSlice( @@ -106,69 +65,26 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: interferedBurst: Long, cpuUsage: Double, cpuDemand: Double, + powerDraw: Double, numberOfDeployedImages: Int, - host: Host, - duration: Long + host: Host ) { - val previousEvent = currentHostEvent[host] - when { - previousEvent == null -> { - val event = HostEvent( - time, - 5 * 60 * 1000L, - host, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[host] ?: 200.0, - host.model.cpuCount - ) - - currentHostEvent[host] = event - } - previousEvent.timestamp == time -> { - val event = HostEvent( - time, - previousEvent.duration, - host, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[host] ?: 200.0, - host.model.cpuCount - ) - - currentHostEvent[host] = event - } - else -> { - hostWriter.write(previousEvent) - - val event = HostEvent( - time, - time - previousEvent.timestamp, - host, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[host] ?: 200.0, - host.model.cpuCount - ) - - currentHostEvent[host] = event - } - } + hostWriter.write( + HostEvent( + time, + 5 * 60 * 1000L, + host, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + powerDraw, + host.model.cpuCount + ) + ) } override fun reportProvisionerMetrics( @@ -196,12 +112,6 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: } override fun close() { - // Flush remaining events - for ((_, event) in currentHostEvent) { - hostWriter.write(event) - } - currentHostEvent.clear() - hostWriter.close() provisionerWriter.close() } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt index 4a3e7963..c8fe1cb2 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt @@ -52,7 +52,7 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) : record.put("interfered_burst", event.interferedBurst) record.put("cpu_usage", event.cpuUsage) record.put("cpu_demand", event.cpuDemand) - record.put("power_draw", event.powerDraw * (1.0 / 12)) + record.put("power_draw", event.powerDraw) record.put("cores", event.cores) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt index 9ab69572..c5294b55 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt @@ -57,8 +57,8 @@ private val logger = KotlinLogging.logger {} @OptIn(ExperimentalStdlibApi::class) public class Sc20StreamingParquetTraceReader( traceFile: File, - performanceInterferenceModel: PerformanceInterferenceModel, - selectedVms: List<String>, + performanceInterferenceModel: PerformanceInterferenceModel? = null, + selectedVms: List<String> = emptyList(), random: Random ) : TraceReader<SimWorkload> { /** @@ -229,20 +229,26 @@ public class Sc20StreamingParquetTraceReader( buffers.remove(id) } val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), - Random(random.nextInt()) - ) + if (performanceInterferenceModel != null) + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), + Random(random.nextInt()) + ) + else + null val workload = SimTraceWorkload(fragments) + val meta = mapOf( + "cores" to maxCores, + "required-memory" to requiredMemory, + "workload" to workload + ) TraceEntry( uid, id, submissionTime, workload, - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) + if (performanceInterferenceModel != null) + meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems as Any) + else + meta ) } .sortedBy { it.start } |
