diff options
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src')
6 files changed, 51 insertions, 151 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt index 799de60f..5f8002e2 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -89,11 +89,10 @@ public class ExperimentMetricExporter( hostMetric.interferedBurst, hostMetric.cpuUsage, hostMetric.cpuDemand, + hostMetric.powerDraw, hostMetric.numberOfDeployedImages, host ) - - monitor.reportPowerConsumption(host, hostMetric.powerDraw) } } @@ -104,19 +103,9 @@ public class ExperimentMetricExporter( val hostMetric = hostMetrics[uid] if (hostMetric != null) { - block(hostMetric, point.sum) - } - } - } - - private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { - val points = data?.doubleSumData?.points ?: emptyList() - for (point in points) { - val uid = point.labels["host"] - val hostMetric = hostMetrics[uid] - - if (hostMetric != null) { - block(hostMetric, point.value) + // Take the average of the summary + val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2 + block(hostMetric, avg) } } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 5e75c890..68631dee 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -42,11 +42,6 @@ public interface ExperimentMonitor : AutoCloseable { public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {} /** - * Report the power consumption of a host. - */ - public fun reportPowerConsumption(host: Host, draw: Double) {} - - /** * This method is invoked for a host for each slice that is finishes. */ public fun reportHostSlice( @@ -57,9 +52,9 @@ public interface ExperimentMonitor : AutoCloseable { interferedBurst: Long, cpuUsage: Double, cpuDemand: Double, + powerDraw: Double, numberOfDeployedImages: Int, - host: Host, - duration: Long = 5 * 60 * 1000L + host: Host ) { } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index 0e675d87..983b4cff 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -50,52 +50,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: File(base, "provisioner-metrics/$partition/data.parquet"), bufferSize ) - private val currentHostEvent = mutableMapOf<Host, HostEvent>() - private var startTime = -1L - override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) { - if (startTime < 0) { - startTime = time - - // Update timestamp of initial event - currentHostEvent.replaceAll { _, v -> v.copy(timestamp = startTime) } - } - } + override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} override fun reportHostStateChange(time: Long, host: Host, newState: HostState) { logger.debug { "Host ${host.uid} changed state $newState [$time]" } - - val previousEvent = currentHostEvent[host] - - val roundedTime = previousEvent?.let { - val duration = time - it.timestamp - val k = 5 * 60 * 1000L // 5 min in ms - val rem = duration % k - - if (rem == 0L) { - time - } else { - it.timestamp + duration + k - rem - } - } ?: time - - reportHostSlice( - roundedTime, - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - host - ) - } - - private val lastPowerConsumption = mutableMapOf<Host, Double>() - - override fun reportPowerConsumption(host: Host, draw: Double) { - lastPowerConsumption[host] = draw } override fun reportHostSlice( @@ -106,69 +65,26 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: interferedBurst: Long, cpuUsage: Double, cpuDemand: Double, + powerDraw: Double, numberOfDeployedImages: Int, - host: Host, - duration: Long + host: Host ) { - val previousEvent = currentHostEvent[host] - when { - previousEvent == null -> { - val event = HostEvent( - time, - 5 * 60 * 1000L, - host, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[host] ?: 200.0, - host.model.cpuCount - ) - - currentHostEvent[host] = event - } - previousEvent.timestamp == time -> { - val event = HostEvent( - time, - previousEvent.duration, - host, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[host] ?: 200.0, - host.model.cpuCount - ) - - currentHostEvent[host] = event - } - else -> { - hostWriter.write(previousEvent) - - val event = HostEvent( - time, - time - previousEvent.timestamp, - host, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[host] ?: 200.0, - host.model.cpuCount - ) - - currentHostEvent[host] = event - } - } + hostWriter.write( + HostEvent( + time, + 5 * 60 * 1000L, + host, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + powerDraw, + host.model.cpuCount + ) + ) } override fun reportProvisionerMetrics( @@ -196,12 +112,6 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: } override fun close() { - // Flush remaining events - for ((_, event) in currentHostEvent) { - hostWriter.write(event) - } - currentHostEvent.clear() - hostWriter.close() provisionerWriter.close() } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt index 4a3e7963..c8fe1cb2 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt @@ -52,7 +52,7 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) : record.put("interfered_burst", event.interferedBurst) record.put("cpu_usage", event.cpuUsage) record.put("cpu_demand", event.cpuDemand) - record.put("power_draw", event.powerDraw * (1.0 / 12)) + record.put("power_draw", event.powerDraw) record.put("cores", event.cores) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt index 9ab69572..c5294b55 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt @@ -57,8 +57,8 @@ private val logger = KotlinLogging.logger {} @OptIn(ExperimentalStdlibApi::class) public class Sc20StreamingParquetTraceReader( traceFile: File, - performanceInterferenceModel: PerformanceInterferenceModel, - selectedVms: List<String>, + performanceInterferenceModel: PerformanceInterferenceModel? = null, + selectedVms: List<String> = emptyList(), random: Random ) : TraceReader<SimWorkload> { /** @@ -229,20 +229,26 @@ public class Sc20StreamingParquetTraceReader( buffers.remove(id) } val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), - Random(random.nextInt()) - ) + if (performanceInterferenceModel != null) + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), + Random(random.nextInt()) + ) + else + null val workload = SimTraceWorkload(fragments) + val meta = mapOf( + "cores" to maxCores, + "required-memory" to requiredMemory, + "workload" to workload + ) TraceEntry( uid, id, submissionTime, workload, - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) + if (performanceInterferenceModel != null) + meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems as Any) + else + meta ) } .sortedBy { it.start } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 0441cfed..d2e7473f 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -118,9 +118,9 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(434262255818, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, + { assertEquals(207388095207, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, + { assertEquals(204745144701, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(2642950497, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) } @@ -156,9 +156,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(702636229989, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(172636987071, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(528959213229, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(96350072517, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(96330335057, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } @@ -197,9 +197,9 @@ class CapelinIntegrationTest { interferedBurst: Long, cpuUsage: Double, cpuDemand: Double, + powerDraw: Double, numberOfDeployedImages: Int, host: Host, - duration: Long ) { totalRequestedBurst += requestedBurst totalGrantedBurst += grantedBurst |
