diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-07 21:01:48 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-07 21:01:48 +0200 |
| commit | 9d60d8d5d0fddf7c90c098be4d50681cffea3022 (patch) | |
| tree | 014729b41730b4316824b64e901518b4036aa5bd | |
| parent | e2e7e1abaf70d7e49e2e4af04648796f01ba6492 (diff) | |
style: Rename monitor to reporter
| -rw-r--r-- | opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt | 24 | ||||
| -rw-r--r-- | opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt) | 12 | ||||
| -rw-r--r-- | opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt) | 17 | ||||
| -rw-r--r-- | opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt | 8 |
4 files changed, 35 insertions, 26 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt index b1964197..c74189c2 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt @@ -184,19 +184,19 @@ suspend fun createProvisioner( * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc20Monitor) { +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) { val domain = simulationContext.domain val hypervisors = scheduler.drivers() // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) hypervisor.server.events .onEach { event -> when (event) { is ServerEvent.StateChanged -> { - monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) } } } @@ -204,7 +204,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc2 hypervisor.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( + is HypervisorEvent.SliceFinished -> reporter.reportHostSlice( simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, @@ -228,7 +228,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc2 /** * Process the trace. */ -suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: Sc20Monitor, vmPlacements: Map<String, String> = emptyMap()) { +suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, reporter: Sc20Reporter, vmPlacements: Map<String, String> = emptyMap()) { val domain = simulationContext.domain try { @@ -266,7 +266,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP server.events .onEach { if (it is ServerEvent.StateChanged) { - monitor.onVmStateChanged(it.server) + reporter.reportVmStateChange(it.server) } delay(1) @@ -299,7 +299,7 @@ fun main(args: Array<String>) { println("allocation-policy: ${cli.allocationPolicy}") val start = System.currentTimeMillis() - val monitor: Sc20Monitor = Sc20ParquetMonitor(cli.outputFile) + val reporter: Sc20Reporter = Sc20ParquetReporter(cli.outputFile) val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider("test") @@ -316,7 +316,7 @@ fun main(args: Array<String>) { Sc20PerformanceInterferenceReader(performanceInterferenceStream) .construct() } catch (e: Throwable) { - monitor.close() + reporter.close() throw e } val vmPlacements = if (cli.vmPlacementFile == null) { @@ -328,7 +328,7 @@ fun main(args: Array<String>) { val traceReader = try { createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed) } catch (e: Throwable) { - monitor.close() + reporter.close() throw e } val allocationPolicy = when (cli.allocationPolicy) { @@ -355,8 +355,8 @@ fun main(args: Array<String>) { null } - attachMonitor(scheduler, monitor) - processTrace(traceReader, scheduler, chan, monitor, vmPlacements) + attachMonitor(scheduler, reporter) + processTrace(traceReader, scheduler, chan, reporter, vmPlacements) println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") @@ -371,5 +371,5 @@ fun main(args: Array<String>) { } // Explicitly close the monitor to flush its buffer - monitor.close() + reporter.close() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt index 5e554196..2b653b69 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt @@ -14,9 +14,9 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.util.concurrent.ArrayBlockingQueue import kotlin.concurrent.thread -class Sc20ParquetMonitor( +class Sc20ParquetReporter( destination: String -) : Sc20Monitor { +) : Sc20Reporter { private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() private val schema = SchemaBuilder .record("slice") @@ -60,9 +60,9 @@ class Sc20ParquetMonitor( } } - override suspend fun onVmStateChanged(server: Server) {} + override suspend fun reportVmStateChange(server: Server) {} - override suspend fun serverStateChanged( + override suspend fun reportHostStateChange( driver: VirtDriver, server: Server, submittedVms: Long, @@ -73,7 +73,7 @@ class Sc20ParquetMonitor( val lastServerState = lastServerStates[server] if (server.state == ServerState.SHUTOFF && lastServerState != null) { val duration = simulationContext.clock.millis() - lastServerState.second - onSliceFinish( + reportHostSlice( simulationContext.clock.millis(), 0, 0, @@ -96,7 +96,7 @@ class Sc20ParquetMonitor( lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) } - override suspend fun onSliceFinish( + override suspend fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt index 4b8b80a8..84500417 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt @@ -28,10 +28,16 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.virt.driver.VirtDriver import java.io.Closeable -interface Sc20Monitor : Closeable { - suspend fun onVmStateChanged(server: Server) {} +interface Sc20Reporter : Closeable { + /** + * This method is invoked when the state of a VM changes. + */ + suspend fun reportVmStateChange(server: Server) {} - suspend fun serverStateChanged( + /** + * This method is invoked when the state of a host changes. + */ + suspend fun reportHostStateChange( driver: VirtDriver, server: Server, submittedVms: Long, @@ -40,7 +46,10 @@ interface Sc20Monitor : Closeable { finishedVms: Long ) {} - suspend fun onSliceFinish( + /** + * This method is invoked for a host for each slice that is finishes. + */ + suspend fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 2bf6bcf3..239d018a 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -63,7 +63,7 @@ class Sc20IntegrationTest { /** * The monitor used to keep track of the metrics. */ - private lateinit var monitor: TestSc20Monitor + private lateinit var monitor: TestSc20Reporter /** * Setup the experimental environment. @@ -73,7 +73,7 @@ class Sc20IntegrationTest { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() simulationEngine = provider("test") root = simulationEngine.newDomain("root") - monitor = TestSc20Monitor() + monitor = TestSc20Reporter() } /** @@ -151,13 +151,13 @@ class Sc20IntegrationTest { return Sc20ClusterEnvironmentReader(stream) } - class TestSc20Monitor : Sc20Monitor { + class TestSc20Reporter : Sc20Reporter { var totalRequestedBurst = 0L var totalGrantedBurst = 0L var totalOvercommissionedBurst = 0L var totalInterferedBurst = 0L - override suspend fun onSliceFinish( + override suspend fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, |
