diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-20 15:19:20 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-20 15:19:20 +0200 |
| commit | 21eafd32c45495ab9e8ebbeffbdbe1d43ffe566b (patch) | |
| tree | 10b4d6053d1cd58e921f71ff7b0d6f0cf7bab75a /opendc/opendc-experiments-sc20/src | |
| parent | e0ccb42ecbaf98cef7c73df7564c669c2d464b0e (diff) | |
bug: Report failures of idle machines
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
2 files changed, 96 insertions, 42 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index ec61ac38..9d2b0247 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -242,10 +242,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP // Monitor server events server.events .onEach { - val time = simulationContext.clock.millis() - if (it is ServerEvent.StateChanged) { - monitor.reportVmStateChange(time, it.server) + monitor.reportVmStateChange(simulationContext.clock.millis(), it.server) } delay(1) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt index 138905a4..7f71eb3e 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.experiments.sc20.experiment.monitor import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent import com.atlarge.opendc.experiments.sc20.experiment.Run @@ -54,9 +53,17 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { File(run.parent.parent.parent.output, "provisioner-metrics/$partition/data.parquet"), run.parent.parent.parent.bufferSize ) - private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() + private val currentHostEvent = mutableMapOf<Server, HostEvent>() + private var startTime = -1L - override fun reportVmStateChange(time: Long, server: Server) {} + override fun reportVmStateChange(time: Long, server: Server) { + if (startTime < 0) { + startTime = time + + // Update timestamp of initial event + currentHostEvent.replaceAll { k, v -> v.copy(timestamp = startTime) } + } + } override fun reportHostStateChange( time: Long, @@ -65,27 +72,31 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { ) { logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } - val lastServerState = lastServerStates[server] - if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = time - lastServerState.second - reportHostSlice( - time, - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server, - duration - ) + val previousEvent = currentHostEvent[server] - lastServerStates.remove(server) - lastPowerConsumption.remove(server) - } else { - lastServerStates[server] = Pair(server.state, time) - } + val roundedTime = previousEvent?.let { + val duration = time - it.timestamp + val k = 5 * 60 * 1000L // 5 min in ms + val rem = duration % k + + if (rem == 0L) { + time + } else { + it.timestamp + duration + k - rem + } + } ?: time + + reportHostSlice( + roundedTime, + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server + ) } private val lastPowerConsumption = mutableMapOf<Server, Double>() @@ -106,23 +117,62 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { hostServer: Server, duration: Long ) { - lastServerStates[hostServer] = Pair(hostServer.state, time) + val previousEvent = currentHostEvent[hostServer] + when { + previousEvent == null -> { + val event = HostEvent( + time, + 5 * 60 * 1000L, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0 + ) - hostWriter.write( - HostEvent( - time, - duration, - hostServer, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0 - ) - ) + currentHostEvent[hostServer] = event + } + previousEvent.timestamp == time -> { + val event = HostEvent( + time, + previousEvent.duration, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0 + ) + + currentHostEvent[hostServer] = event + } + else -> { + hostWriter.write(previousEvent) + + val event = HostEvent( + time, + time - previousEvent.timestamp, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0 + ) + + currentHostEvent[hostServer] = event + } + } } override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { @@ -141,6 +191,12 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { } override fun close() { + // Flush remaining events + for ((_, event) in currentHostEvent) { + hostWriter.write(event) + } + currentHostEvent.clear() + hostWriter.close() provisionerWriter.close() } |
