From 9d60d8d5d0fddf7c90c098be4d50681cffea3022 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 7 May 2020 21:01:48 +0200 Subject: style: Rename monitor to reporter --- .../opendc/experiments/sc20/Sc20Experiment.kt | 24 ++-- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 59 -------- .../opendc/experiments/sc20/Sc20ParquetMonitor.kt | 149 --------------------- .../opendc/experiments/sc20/Sc20ParquetReporter.kt | 149 +++++++++++++++++++++ .../opendc/experiments/sc20/Sc20Reporter.kt | 68 ++++++++++ .../opendc/experiments/sc20/Sc20IntegrationTest.kt | 8 +- 6 files changed, 233 insertions(+), 224 deletions(-) delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt index b1964197..c74189c2 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt @@ -184,19 +184,19 @@ suspend fun createProvisioner( * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc20Monitor) { +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) { val domain = simulationContext.domain val hypervisors = scheduler.drivers() // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) hypervisor.server.events .onEach { event -> when (event) { is ServerEvent.StateChanged -> { - monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) } } } @@ -204,7 +204,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc2 hypervisor.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( + is HypervisorEvent.SliceFinished -> reporter.reportHostSlice( simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, @@ -228,7 +228,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc2 /** * Process the trace. */ -suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, monitor: Sc20Monitor, vmPlacements: Map = emptyMap()) { +suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, reporter: Sc20Reporter, vmPlacements: Map = emptyMap()) { val domain = simulationContext.domain try { @@ -266,7 +266,7 @@ suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtP server.events .onEach { if (it is ServerEvent.StateChanged) { - monitor.onVmStateChanged(it.server) + reporter.reportVmStateChange(it.server) } delay(1) @@ -299,7 +299,7 @@ fun main(args: Array) { println("allocation-policy: ${cli.allocationPolicy}") val start = System.currentTimeMillis() - val monitor: Sc20Monitor = Sc20ParquetMonitor(cli.outputFile) + val reporter: Sc20Reporter = Sc20ParquetReporter(cli.outputFile) val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider("test") @@ -316,7 +316,7 @@ fun main(args: Array) { Sc20PerformanceInterferenceReader(performanceInterferenceStream) .construct() } catch (e: Throwable) { - monitor.close() + reporter.close() throw e } val vmPlacements = if (cli.vmPlacementFile == null) { @@ -328,7 +328,7 @@ fun main(args: Array) { val traceReader = try { createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed) } catch (e: Throwable) { - monitor.close() + reporter.close() throw e } val allocationPolicy = when (cli.allocationPolicy) { @@ -355,8 +355,8 @@ fun main(args: Array) { null } - attachMonitor(scheduler, monitor) - processTrace(traceReader, scheduler, chan, monitor, vmPlacements) + attachMonitor(scheduler, reporter) + processTrace(traceReader, scheduler, chan, reporter, vmPlacements) println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") @@ -371,5 +371,5 @@ fun main(args: Array) { } // Explicitly close the monitor to flush its buffer - monitor.close() + reporter.close() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt deleted file mode 100644 index 4b8b80a8..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import java.io.Closeable - -interface Sc20Monitor : Closeable { - suspend fun onVmStateChanged(server: Server) {} - - suspend fun serverStateChanged( - driver: VirtDriver, - server: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long - ) {} - - suspend fun onSliceFinish( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long, - duration: Long = 5 * 60 * 1000L - ) {} -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt deleted file mode 100644 index 5e554196..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt +++ /dev/null @@ -1,149 +0,0 @@ -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.metal.driver.BareMetalDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import kotlinx.coroutines.flow.first -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread - -class Sc20ParquetMonitor( - destination: String -) : Sc20Monitor { - private val lastServerStates = mutableMapOf>() - private val schema = SchemaBuilder - .record("slice") - .namespace("com.atlarge.opendc.experiments.sc20") - .fields() - .name("time").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("requestedBurst").type().longType().noDefault() - .name("grantedBurst").type().longType().noDefault() - .name("overcommissionedBurst").type().longType().noDefault() - .name("interferedBurst").type().longType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("cpuDemand").type().doubleType().noDefault() - .name("numberOfDeployedImages").type().intType().noDefault() - .name("server").type().stringType().noDefault() - .name("hostState").type().stringType().noDefault() - .name("hostUsage").type().doubleType().noDefault() - .name("powerDraw").type().doubleType().noDefault() - .name("totalSubmittedVms").type().longType().noDefault() - .name("totalQueuedVms").type().longType().noDefault() - .name("totalRunningVms").type().longType().noDefault() - .name("totalFinishedVms").type().longType().noDefault() - .endRecord() - private val writer = AvroParquetWriter.builder(Path(destination)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - private val queue = ArrayBlockingQueue(2048) - private val writerThread = thread(start = true, name = "sc20-writer") { - try { - while (true) { - val record = queue.take() - writer.write(record) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - writer.close() - } - } - - override suspend fun onVmStateChanged(server: Server) {} - - override suspend fun serverStateChanged( - driver: VirtDriver, - server: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long - ) { - val lastServerState = lastServerStates[server] - if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = simulationContext.clock.millis() - lastServerState.second - onSliceFinish( - simulationContext.clock.millis(), - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server, - submittedVms, - queuedVms, - runningVms, - finishedVms, - duration - ) - } - - println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}") - - lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) - } - - override suspend fun onSliceFinish( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long, - duration: Long - ) { - // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.services[BareMetalDriver.Key] - val usage = driver.usage.first() - val powerDraw = driver.powerDraw.first() - - val record = GenericData.Record(schema) - record.put("time", time) - record.put("duration", duration) - record.put("requestedBurst", requestedBurst) - record.put("grantedBurst", grantedBurst) - record.put("overcommissionedBurst", overcommissionedBurst) - record.put("interferedBurst", interferedBurst) - record.put("cpuUsage", cpuUsage) - record.put("cpuDemand", cpuDemand) - record.put("numberOfDeployedImages", numberOfDeployedImages) - record.put("server", hostServer.uid) - record.put("hostState", hostServer.state) - record.put("hostUsage", usage) - record.put("powerDraw", powerDraw) - record.put("totalSubmittedVms", submittedVms) - record.put("totalQueuedVms", queuedVms) - record.put("totalRunningVms", runningVms) - record.put("totalFinishedVms", finishedVms) - - queue.put(record) - } - - override fun close() { - // Busy loop to wait for writer thread to finish - while (queue.isNotEmpty()) { - Thread.sleep(500) - } - writerThread.interrupt() - } -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt new file mode 100644 index 00000000..2b653b69 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt @@ -0,0 +1,149 @@ +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import kotlinx.coroutines.flow.first +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread + +class Sc20ParquetReporter( + destination: String +) : Sc20Reporter { + private val lastServerStates = mutableMapOf>() + private val schema = SchemaBuilder + .record("slice") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("requestedBurst").type().longType().noDefault() + .name("grantedBurst").type().longType().noDefault() + .name("overcommissionedBurst").type().longType().noDefault() + .name("interferedBurst").type().longType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("cpuDemand").type().doubleType().noDefault() + .name("numberOfDeployedImages").type().intType().noDefault() + .name("server").type().stringType().noDefault() + .name("hostState").type().stringType().noDefault() + .name("hostUsage").type().doubleType().noDefault() + .name("powerDraw").type().doubleType().noDefault() + .name("totalSubmittedVms").type().longType().noDefault() + .name("totalQueuedVms").type().longType().noDefault() + .name("totalRunningVms").type().longType().noDefault() + .name("totalFinishedVms").type().longType().noDefault() + .endRecord() + private val writer = AvroParquetWriter.builder(Path(destination)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + private val queue = ArrayBlockingQueue(2048) + private val writerThread = thread(start = true, name = "sc20-writer") { + try { + while (true) { + val record = queue.take() + writer.write(record) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + writer.close() + } + } + + override suspend fun reportVmStateChange(server: Server) {} + + override suspend fun reportHostStateChange( + driver: VirtDriver, + server: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long + ) { + val lastServerState = lastServerStates[server] + if (server.state == ServerState.SHUTOFF && lastServerState != null) { + val duration = simulationContext.clock.millis() - lastServerState.second + reportHostSlice( + simulationContext.clock.millis(), + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server, + submittedVms, + queuedVms, + runningVms, + finishedVms, + duration + ) + } + + println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}") + + lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) + } + + override suspend fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, + duration: Long + ) { + // Assume for now that the host is not virtualized and measure the current power draw + val driver = hostServer.services[BareMetalDriver.Key] + val usage = driver.usage.first() + val powerDraw = driver.powerDraw.first() + + val record = GenericData.Record(schema) + record.put("time", time) + record.put("duration", duration) + record.put("requestedBurst", requestedBurst) + record.put("grantedBurst", grantedBurst) + record.put("overcommissionedBurst", overcommissionedBurst) + record.put("interferedBurst", interferedBurst) + record.put("cpuUsage", cpuUsage) + record.put("cpuDemand", cpuDemand) + record.put("numberOfDeployedImages", numberOfDeployedImages) + record.put("server", hostServer.uid) + record.put("hostState", hostServer.state) + record.put("hostUsage", usage) + record.put("powerDraw", powerDraw) + record.put("totalSubmittedVms", submittedVms) + record.put("totalQueuedVms", queuedVms) + record.put("totalRunningVms", runningVms) + record.put("totalFinishedVms", finishedVms) + + queue.put(record) + } + + override fun close() { + // Busy loop to wait for writer thread to finish + while (queue.isNotEmpty()) { + Thread.sleep(500) + } + writerThread.interrupt() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt new file mode 100644 index 00000000..84500417 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt @@ -0,0 +1,68 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import java.io.Closeable + +interface Sc20Reporter : Closeable { + /** + * This method is invoked when the state of a VM changes. + */ + suspend fun reportVmStateChange(server: Server) {} + + /** + * This method is invoked when the state of a host changes. + */ + suspend fun reportHostStateChange( + driver: VirtDriver, + server: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long + ) {} + + /** + * This method is invoked for a host for each slice that is finishes. + */ + suspend fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, + duration: Long = 5 * 60 * 1000L + ) {} +} diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 2bf6bcf3..239d018a 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -63,7 +63,7 @@ class Sc20IntegrationTest { /** * The monitor used to keep track of the metrics. */ - private lateinit var monitor: TestSc20Monitor + private lateinit var monitor: TestSc20Reporter /** * Setup the experimental environment. @@ -73,7 +73,7 @@ class Sc20IntegrationTest { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() simulationEngine = provider("test") root = simulationEngine.newDomain("root") - monitor = TestSc20Monitor() + monitor = TestSc20Reporter() } /** @@ -151,13 +151,13 @@ class Sc20IntegrationTest { return Sc20ClusterEnvironmentReader(stream) } - class TestSc20Monitor : Sc20Monitor { + class TestSc20Reporter : Sc20Reporter { var totalRequestedBurst = 0L var totalGrantedBurst = 0L var totalOvercommissionedBurst = 0L var totalInterferedBurst = 0L - override suspend fun onSliceFinish( + override suspend fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, -- cgit v1.2.3