From 9d60d8d5d0fddf7c90c098be4d50681cffea3022 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 7 May 2020 21:01:48 +0200 Subject: style: Rename monitor to reporter --- .../opendc/experiments/sc20/Sc20Experiment.kt | 24 ++-- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 59 -------- .../opendc/experiments/sc20/Sc20ParquetMonitor.kt | 149 --------------------- .../opendc/experiments/sc20/Sc20ParquetReporter.kt | 149 +++++++++++++++++++++ .../opendc/experiments/sc20/Sc20Reporter.kt | 68 ++++++++++ .../opendc/experiments/sc20/Sc20IntegrationTest.kt | 8 +- 6 files changed, 233 insertions(+), 224 deletions(-) delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt index b1964197..c74189c2 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt @@ -184,19 +184,19 @@ suspend fun createProvisioner( * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc20Monitor) { +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) { val domain = simulationContext.domain val hypervisors = scheduler.drivers() // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) hypervisor.server.events .onEach { event -> when (event) { is ServerEvent.StateChanged -> { - monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) } } } @@ -204,7 +204,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc2 hypervisor.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( + is HypervisorEvent.SliceFinished -> reporter.reportHostSlice( simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, @@ -228,7 +228,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc2 /** * Process the trace. */ -suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, monitor: Sc20Monitor, vmPlacements: Map = emptyMap()) { +suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, reporter: Sc20Reporter, vmPlacements: Map = emptyMap()) { val domain = simulationContext.domain try { @@ -266,7 +266,7 @@ suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtP server.events .onEach { if (it is ServerEvent.StateChanged) { - monitor.onVmStateChanged(it.server) + reporter.reportVmStateChange(it.server) } delay(1) @@ -299,7 +299,7 @@ fun main(args: Array) { println("allocation-policy: ${cli.allocationPolicy}") val start = System.currentTimeMillis() - val monitor: Sc20Monitor = Sc20ParquetMonitor(cli.outputFile) + val reporter: Sc20Reporter = Sc20ParquetReporter(cli.outputFile) val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider("test") @@ -316,7 +316,7 @@ fun main(args: Array) { Sc20PerformanceInterferenceReader(performanceInterferenceStream) .construct() } catch (e: Throwable) { - monitor.close() + reporter.close() throw e } val vmPlacements = if (cli.vmPlacementFile == null) { @@ -328,7 +328,7 @@ fun main(args: Array) { val traceReader = try { createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed) } catch (e: Throwable) { - monitor.close() + reporter.close() throw e } val allocationPolicy = when (cli.allocationPolicy) { @@ -355,8 +355,8 @@ fun main(args: Array) { null } - attachMonitor(scheduler, monitor) - processTrace(traceReader, scheduler, chan, monitor, vmPlacements) + attachMonitor(scheduler, reporter) + processTrace(traceReader, scheduler, chan, reporter, vmPlacements) println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") @@ -371,5 +371,5 @@ fun main(args: Array) { } // Explicitly close the monitor to flush its buffer - monitor.close() + reporter.close() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt deleted file mode 100644 index 4b8b80a8..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import java.io.Closeable - -interface Sc20Monitor : Closeable { - suspend fun onVmStateChanged(server: Server) {} - - suspend fun serverStateChanged( - driver: VirtDriver, - server: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long - ) {} - - suspend fun onSliceFinish( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long, - duration: Long = 5 * 60 * 1000L - ) {} -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt deleted file mode 100644 index 5e554196..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt +++ /dev/null @@ -1,149 +0,0 @@ -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.metal.driver.BareMetalDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import kotlinx.coroutines.flow.first -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread - -class Sc20ParquetMonitor( - destination: String -) : Sc20Monitor { - private val lastServerStates = mutableMapOf>() - private val schema = SchemaBuilder - .record("slice") - .namespace("com.atlarge.opendc.experiments.sc20") - .fields() - .name("time").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("requestedBurst").type().longType().noDefault() - .name("grantedBurst").type().longType().noDefault() - .name("overcommissionedBurst").type().longType().noDefault() - .name("interferedBurst").type().longType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("cpuDemand").type().doubleType().noDefault() - .name("numberOfDeployedImages").type().intType().noDefault() - .name("server").type().stringType().noDefault() - .name("hostState").type().stringType().noDefault() - .name("hostUsage").type().doubleType().noDefault() - .name("powerDraw").type().doubleType().noDefault() - .name("totalSubmittedVms").type().longType().noDefault() - .name("totalQueuedVms").type().longType().noDefault() - .name("totalRunningVms").type().longType().noDefault() - .name("totalFinishedVms").type().longType().noDefault() - .endRecord() - private val writer = AvroParquetWriter.builder(Path(destination)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - private val queue = ArrayBlockingQueue(2048) - private val writerThread = thread(start = true, name = "sc20-writer") { - try { - while (true) { - val record = queue.take() - writer.write(record) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - writer.close() - } - } - - override suspend fun onVmStateChanged(server: Server) {} - - override suspend fun serverStateChanged( - driver: VirtDriver, - server: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long - ) { - val lastServerState = lastServerStates[server] - if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = simulationContext.clock.millis() - lastServerState.second - onSliceFinish( - simulationContext.clock.millis(), - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server, - submittedVms, - queuedVms, - runningVms, - finishedVms, - duration - ) - } - - println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}") - - lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) - } - - override suspend fun onSliceFinish( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long, - duration: Long - ) { - // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.services[BareMetalDriver.Key] - val usage = driver.usage.first() - val powerDraw = driver.powerDraw.first() - - val record = GenericData.Record(schema) - record.put("time", time) - record.put("duration", duration) - record.put("requestedBurst", requestedBurst) - record.put("grantedBurst", grantedBurst) - record.put("overcommissionedBurst", overcommissionedBurst) - record.put("interferedBurst", interferedBurst) - record.put("cpuUsage", cpuUsage) - record.put("cpuDemand", cpuDemand) - record.put("numberOfDeployedImages", numberOfDeployedImages) - record.put("server", hostServer.uid) - record.put("hostState", hostServer.state) - record.put("hostUsage", usage) - record.put("powerDraw", powerDraw) - record.put("totalSubmittedVms", submittedVms) - record.put("totalQueuedVms", queuedVms) - record.put("totalRunningVms", runningVms) - record.put("totalFinishedVms", finishedVms) - - queue.put(record) - } - - override fun close() { - // Busy loop to wait for writer thread to finish - while (queue.isNotEmpty()) { - Thread.sleep(500) - } - writerThread.interrupt() - } -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt new file mode 100644 index 00000000..2b653b69 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt @@ -0,0 +1,149 @@ +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import kotlinx.coroutines.flow.first +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread + +class Sc20ParquetReporter( + destination: String +) : Sc20Reporter { + private val lastServerStates = mutableMapOf>() + private val schema = SchemaBuilder + .record("slice") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("requestedBurst").type().longType().noDefault() + .name("grantedBurst").type().longType().noDefault() + .name("overcommissionedBurst").type().longType().noDefault() + .name("interferedBurst").type().longType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("cpuDemand").type().doubleType().noDefault() + .name("numberOfDeployedImages").type().intType().noDefault() + .name("server").type().stringType().noDefault() + .name("hostState").type().stringType().noDefault() + .name("hostUsage").type().doubleType().noDefault() + .name("powerDraw").type().doubleType().noDefault() + .name("totalSubmittedVms").type().longType().noDefault() + .name("totalQueuedVms").type().longType().noDefault() + .name("totalRunningVms").type().longType().noDefault() + .name("totalFinishedVms").type().longType().noDefault() + .endRecord() + private val writer = AvroParquetWriter.builder(Path(destination)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + private val queue = ArrayBlockingQueue(2048) + private val writerThread = thread(start = true, name = "sc20-writer") { + try { + while (true) { + val record = queue.take() + writer.write(record) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + writer.close() + } + } + + override suspend fun reportVmStateChange(server: Server) {} + + override suspend fun reportHostStateChange( + driver: VirtDriver, + server: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long + ) { + val lastServerState = lastServerStates[server] + if (server.state == ServerState.SHUTOFF && lastServerState != null) { + val duration = simulationContext.clock.millis() - lastServerState.second + reportHostSlice( + simulationContext.clock.millis(), + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server, + submittedVms, + queuedVms, + runningVms, + finishedVms, + duration + ) + } + + println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}") + + lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) + } + + override suspend fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, + duration: Long + ) { + // Assume for now that the host is not virtualized and measure the current power draw + val driver = hostServer.services[BareMetalDriver.Key] + val usage = driver.usage.first() + val powerDraw = driver.powerDraw.first() + + val record = GenericData.Record(schema) + record.put("time", time) + record.put("duration", duration) + record.put("requestedBurst", requestedBurst) + record.put("grantedBurst", grantedBurst) + record.put("overcommissionedBurst", overcommissionedBurst) + record.put("interferedBurst", interferedBurst) + record.put("cpuUsage", cpuUsage) + record.put("cpuDemand", cpuDemand) + record.put("numberOfDeployedImages", numberOfDeployedImages) + record.put("server", hostServer.uid) + record.put("hostState", hostServer.state) + record.put("hostUsage", usage) + record.put("powerDraw", powerDraw) + record.put("totalSubmittedVms", submittedVms) + record.put("totalQueuedVms", queuedVms) + record.put("totalRunningVms", runningVms) + record.put("totalFinishedVms", finishedVms) + + queue.put(record) + } + + override fun close() { + // Busy loop to wait for writer thread to finish + while (queue.isNotEmpty()) { + Thread.sleep(500) + } + writerThread.interrupt() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt new file mode 100644 index 00000000..84500417 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt @@ -0,0 +1,68 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import java.io.Closeable + +interface Sc20Reporter : Closeable { + /** + * This method is invoked when the state of a VM changes. + */ + suspend fun reportVmStateChange(server: Server) {} + + /** + * This method is invoked when the state of a host changes. + */ + suspend fun reportHostStateChange( + driver: VirtDriver, + server: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long + ) {} + + /** + * This method is invoked for a host for each slice that is finishes. + */ + suspend fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, + duration: Long = 5 * 60 * 1000L + ) {} +} diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 2bf6bcf3..239d018a 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -63,7 +63,7 @@ class Sc20IntegrationTest { /** * The monitor used to keep track of the metrics. */ - private lateinit var monitor: TestSc20Monitor + private lateinit var monitor: TestSc20Reporter /** * Setup the experimental environment. @@ -73,7 +73,7 @@ class Sc20IntegrationTest { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() simulationEngine = provider("test") root = simulationEngine.newDomain("root") - monitor = TestSc20Monitor() + monitor = TestSc20Reporter() } /** @@ -151,13 +151,13 @@ class Sc20IntegrationTest { return Sc20ClusterEnvironmentReader(stream) } - class TestSc20Monitor : Sc20Monitor { + class TestSc20Reporter : Sc20Reporter { var totalRequestedBurst = 0L var totalGrantedBurst = 0L var totalOvercommissionedBurst = 0L var totalInterferedBurst = 0L - override suspend fun onSliceFinish( + override suspend fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, -- cgit v1.2.3 From 6f12a5e3bc59f4ca386cf8a6b231c1464ea10169 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 7 May 2020 21:26:30 +0200 Subject: feat: Add support for more sophisticated logging --- opendc/opendc-compute/build.gradle.kts | 1 + .../virt/service/SimpleVirtProvisioningService.kt | 13 +++--- opendc/opendc-experiments-sc20/build.gradle.kts | 6 ++- .../opendc/experiments/sc20/Sc20Experiment.kt | 31 +++++++++------ .../opendc/experiments/sc20/Sc20ParquetReporter.kt | 5 ++- .../experiments/sc20/Sc20ParquetTraceReader.kt | 5 ++- .../src/main/resources/log4j2.xml | 46 ++++++++++++++++++++++ 7 files changed, 86 insertions(+), 21 deletions(-) create mode 100644 opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml diff --git a/opendc/opendc-compute/build.gradle.kts b/opendc/opendc-compute/build.gradle.kts index 09c904f2..7d43b064 100644 --- a/opendc/opendc-compute/build.gradle.kts +++ b/opendc/opendc-compute/build.gradle.kts @@ -33,6 +33,7 @@ dependencies { implementation(kotlin("stdlib")) api(project(":odcsim:odcsim-api")) api(project(":opendc:opendc-core")) + implementation("io.github.microutils:kotlin-logging:1.7.9") testRuntimeOnly(project(":odcsim:odcsim-engine-omega")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 2185b372..3603ae69 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -24,10 +24,13 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext +import mu.KotlinLogging import kotlin.coroutines.Continuation import kotlin.coroutines.resume import kotlin.math.max +private val logger = KotlinLogging.logger {} + @OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( public override val allocationPolicy: AllocationPolicy, @@ -141,7 +144,7 @@ class SimpleVirtProvisioningService( unscheduledVms++ incomingImages -= imageInstance - println("[${clock.millis()}] CANNOT SPAWN ${imageInstance.image}") + logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]") continue } else { break @@ -149,7 +152,7 @@ class SimpleVirtProvisioningService( } try { - println("[${clock.millis()}] SPAWN ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}") + logger.info { "Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } incomingImages -= imageInstance // Speculatively update the hypervisor view information to prevent other images in the queue from @@ -174,7 +177,7 @@ class SimpleVirtProvisioningService( when (event) { is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { - println("[${clock.millis()}] FINISH ${event.server.uid} ${event.server.name} ${event.server.flavor}") + logger.info { "Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } runningVms-- finishedVms++ @@ -191,13 +194,13 @@ class SimpleVirtProvisioningService( } .launchIn(this) } catch (e: InsufficientMemoryOnServerException) { - println("Unable to deploy image due to insufficient memory") + logger.error("Failed to deploy VM", e) selectedHv.numberOfActiveServers-- selectedHv.provisionedCores -= imageInstance.flavor.cpuCount selectedHv.availableMemory += requiredMemory } catch (e: Throwable) { - e.printStackTrace() + logger.error("Failed to deploy VM", e) } } } diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index ccfa3038..76ec7cc4 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -40,12 +40,14 @@ dependencies { implementation(project(":opendc:opendc-format")) implementation(kotlin("stdlib")) implementation("com.xenomachina:kotlin-argparser:2.0.7") - api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") + implementation("org.slf4j:slf4j-api:${Library.SLF4J}") + implementation("io.github.microutils:kotlin-logging:1.7.9") implementation("org.apache.parquet:parquet-avro:1.11.0") implementation("org.apache.hadoop:hadoop-client:3.2.1") { exclude(group = "org.slf4j", module = "slf4j-log4j12") + exclude(group = "log4j") } - runtimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}") + runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") runtimeOnly(project(":odcsim:odcsim-engine-omega")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt index c74189c2..2029d3e7 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt @@ -65,6 +65,7 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext +import mu.KotlinLogging import java.io.File import java.io.FileReader import java.util.ServiceLoader @@ -73,6 +74,8 @@ import kotlin.math.ln import kotlin.math.max import kotlin.random.Random +private val logger = KotlinLogging.logger {} + class ExperimentParameters(parser: ArgParser) { val traceDirectory by parser.storing("path to the trace directory") val environmentFile by parser.storing("path to the environment file") @@ -244,12 +247,12 @@ suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtP // Check if VM in topology val clusterName = vmPlacements[vmId] if (clusterName == null) { - println("Could not find placement data in VM placement file for VM $vmId") + logger.warn { "Could not find placement data in VM placement file for VM $vmId" } continue } val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false if (machineInCluster) { - println("Ignored VM") + logger.info { "Ignored VM $vmId" } continue } } @@ -290,13 +293,13 @@ suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtP @OptIn(ExperimentalCoroutinesApi::class) fun main(args: Array) { val cli = ArgParser(args).parseInto(::ExperimentParameters) - println("trace-directory: ${cli.traceDirectory}") - println("environment-file: ${cli.environmentFile}") - println("performance-interference-file: ${cli.performanceInterferenceFile}") - println("selected-vms-file: ${cli.selectedVmsFile}") - println("seed: ${cli.seed}") - println("failures: ${cli.failures}") - println("allocation-policy: ${cli.allocationPolicy}") + logger.info("trace-directory: ${cli.traceDirectory}") + logger.info("environment-file: ${cli.environmentFile}") + logger.info("performance-interference-file: ${cli.performanceInterferenceFile}") + logger.info("selected-vms-file: ${cli.selectedVmsFile}") + logger.info("seed: ${cli.seed}") + logger.info("failures: ${cli.failures}") + logger.info("allocation-policy: ${cli.allocationPolicy}") val start = System.currentTimeMillis() val reporter: Sc20Reporter = Sc20ParquetReporter(cli.outputFile) @@ -349,7 +352,7 @@ fun main(args: Array) { val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) val failureDomain = if (cli.failures) { - println("ENABLING failures") + logger.info("ENABLING failures") createFailureDomain(cli.seed, cli.failureInterval, bareMetalProvisioner, chan) } else { null @@ -358,11 +361,15 @@ fun main(args: Array) { attachMonitor(scheduler, reporter) processTrace(traceReader, scheduler, chan, reporter, vmPlacements) - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") failureDomain?.cancel() scheduler.terminate() - println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") + logger.info("Simulation took ${System.currentTimeMillis() - start} milliseconds") } runBlocking { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt index 2b653b69..fc3e98ae 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt @@ -6,6 +6,7 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import kotlinx.coroutines.flow.first +import mu.KotlinLogging import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path @@ -14,6 +15,8 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.util.concurrent.ArrayBlockingQueue import kotlin.concurrent.thread +private val logger = KotlinLogging.logger {} + class Sc20ParquetReporter( destination: String ) : Sc20Reporter { @@ -91,7 +94,7 @@ class Sc20ParquetReporter( ) } - println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}") + logger.info("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]") lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt index 0a7718e9..8ae1693c 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt @@ -32,6 +32,7 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader +import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader @@ -49,6 +50,8 @@ import java.util.concurrent.ArrayBlockingQueue import kotlin.concurrent.thread import kotlin.random.Random +private val logger = KotlinLogging.logger {} + /** * A [TraceReader] for the internal VM workload trace format. * @@ -190,7 +193,7 @@ class Sc20ParquetTraceReader( assert(uid !in takenIds) takenIds += uid - println(id) + logger.info("Processing VM $id") val internalBuffer = mutableListOf() val externalBuffer = mutableListOf() diff --git a/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml b/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml new file mode 100644 index 00000000..77a15e55 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml @@ -0,0 +1,46 @@ + + + + + + + + + + + + + + + + + + + + + + + + -- cgit v1.2.3 From f4fe224194c0bcabda2e17005077e76ea9e7098c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 7 May 2020 22:15:59 +0200 Subject: refactor: Migrate to Clikt for argument parsing This change migrates the experiment runner to Clikt for parsing command line arguments, since it provides functionality that we need for having multiple types of reporters (e.g. postgres and parquet), not offered by the current parser. --- opendc/opendc-experiments-sc20/build.gradle.kts | 3 +- .../opendc/experiments/sc20/ExperimentHelpers.kt | 235 +++++++++++ .../opendc/experiments/sc20/Sc20Experiment.kt | 442 +++++++-------------- .../opendc/experiments/sc20/Sc20ParquetReporter.kt | 7 +- 4 files changed, 380 insertions(+), 307 deletions(-) create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index 76ec7cc4..42ccb1b1 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -39,8 +39,7 @@ dependencies { api(project(":opendc:opendc-core")) implementation(project(":opendc:opendc-format")) implementation(kotlin("stdlib")) - implementation("com.xenomachina:kotlin-argparser:2.0.7") - implementation("org.slf4j:slf4j-api:${Library.SLF4J}") + implementation("com.github.ajalt:clikt:2.6.0") implementation("io.github.microutils:kotlin-logging:1.7.9") implementation("org.apache.parquet:parquet-avro:1.11.0") implementation("org.apache.hadoop:hadoop-client:3.2.1") { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt new file mode 100644 index 00000000..e37dea8b --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt @@ -0,0 +1,235 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorEvent +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver +import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import com.atlarge.opendc.core.failure.CorrelatedFaultInjector +import com.atlarge.opendc.core.failure.FailureDomain +import com.atlarge.opendc.core.failure.FaultInjector +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import mu.KotlinLogging +import java.io.File +import java.util.TreeSet +import kotlin.math.ln +import kotlin.math.max +import kotlin.random.Random + +/** + * The logger for this experiment. + */ +private val logger = KotlinLogging.logger {} + +/** + * Construct the failure domain for the experiments. + */ +suspend fun createFailureDomain( + seed: Int, + failureInterval: Int, + bareMetalProvisioner: ProvisioningService, + chan: Channel +): Domain { + val root = simulationContext.domain + val domain = root.newDomain(name = "failures") + domain.launch { + chan.receive() + val random = Random(seed) + val injectors = mutableMapOf() + for (node in bareMetalProvisioner.nodes()) { + val cluster = node.metadata[NODE_CLUSTER] as String + val injector = + injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) } + injector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + return domain +} + +/** + * Obtain the [FaultInjector] to use for the experiments. + */ +fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): FaultInjector { + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return CorrelatedFaultInjector( + domain, + iatScale = ln(failureInterval.toDouble()), iatShape = 1.03, // Hours + sizeScale = 1.88, sizeShape = 1.25, + dScale = 9.51, dShape = 3.21, // Minutes + random = random + ) +} + +/** + * Create the trace reader from which the VM workloads are read. + */ +fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List, seed: Int): Sc20ParquetTraceReader { + return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) +} + +/** + * Construct the environment for a VM provisioner and return the provisioner instance. + */ +suspend fun createProvisioner( + root: Domain, + environmentReader: EnvironmentReader, + allocationPolicy: AllocationPolicy +): Pair = withContext(root.coroutineContext) { + val environment = environmentReader.use { it.construct(root) } + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] + + // Wait for the bare metal nodes to be spawned + delay(10) + + val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) + + // Wait for the hypervisors to be spawned + delay(10) + + bareMetalProvisioner to scheduler +} + +/** + * Attach the specified monitor to the VM provisioner. + */ +@OptIn(ExperimentalCoroutinesApi::class) +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) { + val domain = simulationContext.domain + val hypervisors = scheduler.drivers() + + // Monitor hypervisor events + for (hypervisor in hypervisors) { + // TODO Do not expose VirtDriver directly but use Hypervisor class. + reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + hypervisor.server.events + .onEach { event -> + when (event) { + is ServerEvent.StateChanged -> { + reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + } + } + } + .launchIn(domain) + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> reporter.reportHostSlice( + simulationContext.clock.millis(), + event.requestedBurst, + event.grantedBurst, + event.overcommissionedBurst, + event.interferedBurst, + event.cpuUsage, + event.cpuDemand, + event.numberOfDeployedImages, + event.hostServer, + scheduler.submittedVms, + scheduler.queuedVms, + scheduler.runningVms, + scheduler.finishedVms + ) + } + } + .launchIn(domain) + } +} + +/** + * Process the trace. + */ +suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, reporter: Sc20Reporter, vmPlacements: Map = emptyMap()) { + val domain = simulationContext.domain + + try { + var submitted = 0L + val finished = Channel(Channel.CONFLATED) + val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) + + while (reader.hasNext()) { + val (time, workload) = reader.next() + + if (vmPlacements.isNotEmpty()) { + val vmId = workload.name.replace("VM Workload ", "") + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null) { + logger.warn { "Could not find placement data in VM placement file for VM $vmId" } + continue + } + val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false + if (machineInCluster) { + logger.info { "Ignored VM $vmId" } + continue + } + } + + submitted++ + delay(max(0, time - simulationContext.clock.millis())) + domain.launch { + chan.send(Unit) + val server = scheduler.deploy( + workload.image.name, workload.image, + Flavor(workload.image.maxCores, workload.image.requiredMemory) + ) + // Monitor server events + server.events + .onEach { + if (it is ServerEvent.StateChanged) { + reporter.reportVmStateChange(it.server) + } + + delay(1) + finished.send(Unit) + } + .collect() + } + } + + while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) { + finished.receive() + } + } finally { + reader.close() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt index 2029d3e7..4264ad3f 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt @@ -24,85 +24,98 @@ package com.atlarge.opendc.experiments.sc20 -import com.atlarge.odcsim.Domain import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.ServerEvent -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.compute.metal.NODE_CLUSTER -import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.compute.virt.HypervisorEvent -import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver -import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService -import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy -import com.atlarge.opendc.core.failure.CorrelatedFaultInjector -import com.atlarge.opendc.core.failure.FailureDomain -import com.atlarge.opendc.core.failure.FaultInjector -import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import com.atlarge.opendc.format.trace.TraceReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import com.xenomachina.argparser.ArgParser -import com.xenomachina.argparser.default -import kotlinx.coroutines.ExperimentalCoroutinesApi +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.groups.OptionGroup +import com.github.ajalt.clikt.parameters.groups.default +import com.github.ajalt.clikt.parameters.groups.groupChoice +import com.github.ajalt.clikt.parameters.groups.mutuallyExclusiveOptions +import com.github.ajalt.clikt.parameters.groups.required +import com.github.ajalt.clikt.parameters.options.convert +import com.github.ajalt.clikt.parameters.options.default +import com.github.ajalt.clikt.parameters.options.defaultLazy +import com.github.ajalt.clikt.parameters.options.flag +import com.github.ajalt.clikt.parameters.options.option +import com.github.ajalt.clikt.parameters.options.required +import com.github.ajalt.clikt.parameters.types.choice +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.int import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext import mu.KotlinLogging import java.io.File import java.io.FileReader +import java.io.InputStream import java.util.ServiceLoader -import java.util.TreeSet -import kotlin.math.ln -import kotlin.math.max import kotlin.random.Random +/** + * The logger for this experiment. + */ private val logger = KotlinLogging.logger {} -class ExperimentParameters(parser: ArgParser) { - val traceDirectory by parser.storing("path to the trace directory") - val environmentFile by parser.storing("path to the environment file") - val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null } - val vmPlacementFile by parser.storing("path to the VM placement file").default { null } - val outputFile by parser.storing("path to where the output should be stored") - .default { "data/results-${System.currentTimeMillis()}.parquet" } - val selectedVms by parser.storing("the VMs to run") { parseVMs(this) } - .default { emptyList() } - val selectedVmsFile by parser.storing("path to a file containing the VMs to run") { - parseVMs(FileReader(File(this)).readText()) - } - .default { emptyList() } - val seed by parser.storing("the random seed") { toInt() } +/** + * Represents the command for running the experiment. + */ +class ExperimentCommand : CliktCommand(name = "sc20-experiment") { + private val environment by option("--environment-file", help = "path to the environment file") + .file() + .required() + private val performanceInterferenceStream by option("--performance-interference-file", help = "path to the performance interference file") + .file() + .convert { it.inputStream() as InputStream } + .defaultLazy { ExperimentCommand::class.java.getResourceAsStream("/env/performance-interference.json") } + + private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file") + .file() + .convert { + Sc20VmPlacementReader(it.inputStream().buffered()).construct() + } + .default(emptyMap()) + + private val selectedVms by mutuallyExclusiveOptions( + option("--selected-vms", help = "the VMs to run").convert { parseVMs(it) }, + option("--selected-vms-file").file().convert { parseVMs(FileReader(it).readText()) } + ).default(emptyList()) + + private val seed by option(help = "the random seed") + .int() .default(0) - val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures") - val failureInterval by parser.storing("expected number of hours between failures") { toInt() } + private val failures by option("-x", "--failures", help = "enable (correlated) machine failures") + .flag() + private val failureInterval by option(help = "expected number of hours between failures") + .int() .default(24 * 7) // one week - val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem") - - fun getSelectedVmList(): List { - return if (selectedVms.isEmpty()) { - selectedVmsFile - } else { - selectedVms - } - } + private val allocationPolicy by option(help = "name of VM allocation policy to use") + .choice( + "mem", "mem-inv", + "core-mem", "core-mem-inv", + "active-servers", "active-servers-inv", + "provisioned-cores", "provisioned-cores-inv", + "random", "replay" + ) + .default("core-mem") + + private val trace by option("--trace-directory", help = "path to the trace directory") + .file(canBeFile = false) + .required() + + private val reporter by option().groupChoice( + "parquet" to Parquet() + ).required() private fun parseVMs(string: String): List { // Handle case where VM list contains a VM name with an (escaped) single-quote in it @@ -112,271 +125,98 @@ class ExperimentParameters(parser: ArgParser) { val vms: List = jacksonObjectMapper().readValue(sanitizedString) return vms } -} -/** - * Construct the failure domain for the experiments. - */ -suspend fun createFailureDomain( - seed: Int, - failureInterval: Int, - bareMetalProvisioner: ProvisioningService, - chan: Channel -): Domain { - val root = simulationContext.domain - val domain = root.newDomain(name = "failures") - domain.launch { - chan.receive() - val random = Random(seed) - val injectors = mutableMapOf() - for (node in bareMetalProvisioner.nodes()) { - val cluster = node.metadata[NODE_CLUSTER] as String - val injector = - injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) } - injector.enqueue(node.metadata["driver"] as FailureDomain) + override fun run() { + logger.info("seed: $seed") + logger.info("failures: $failures") + logger.info("allocation-policy: $allocationPolicy") + + val start = System.currentTimeMillis() + val reporter: Sc20Reporter = reporter.createReporter() + + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + val system = provider("test") + val root = system.newDomain("root") + + val chan = Channel(Channel.CONFLATED) + val allocationPolicy = when (this.allocationPolicy) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seed)) + "replay" -> ReplayAllocationPolicy(vmPlacements) + else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}") } - } - return domain -} -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): FaultInjector { - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return CorrelatedFaultInjector( - domain, - iatScale = ln(failureInterval.toDouble()), iatShape = 1.03, // Hours - sizeScale = 1.88, sizeShape = 1.25, - dScale = 9.51, dShape = 3.21, // Minutes - random = random - ) -} - -/** - * Create the trace reader from which the VM workloads are read. - */ -fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List, seed: Int): Sc20ParquetTraceReader { - return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) -} + val performanceInterferenceModel = try { + Sc20PerformanceInterferenceReader(performanceInterferenceStream).construct() + } catch (e: Throwable) { + reporter.close() + throw e + } + val environmentReader = Sc20ClusterEnvironmentReader(environment) + val traceReader = try { + createTraceReader(trace, performanceInterferenceModel, selectedVms, seed) + } catch (e: Throwable) { + reporter.close() + throw e + } -/** - * Construct the environment for a VM provisioner and return the provisioner instance. - */ -suspend fun createProvisioner( - root: Domain, - environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy -): Pair = withContext(root.coroutineContext) { - val environment = environmentReader.use { it.construct(root) } - val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) - // Wait for the bare metal nodes to be spawned - delay(10) + val failureDomain = if (failures) { + logger.info("ENABLING failures") + createFailureDomain(seed, failureInterval, bareMetalProvisioner, chan) + } else { + null + } - val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) + attachMonitor(scheduler, reporter) + processTrace(traceReader, scheduler, chan, reporter, vmPlacements) - // Wait for the hypervisors to be spawned - delay(10) + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") - bareMetalProvisioner to scheduler -} + failureDomain?.cancel() + scheduler.terminate() + logger.info("Simulation took ${System.currentTimeMillis() - start} milliseconds") + } -/** - * Attach the specified monitor to the VM provisioner. - */ -@OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) { - val domain = simulationContext.domain - val hypervisors = scheduler.drivers() + runBlocking { + system.run() + system.terminate() + } - // Monitor hypervisor events - for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - hypervisor.server.events - .onEach { event -> - when (event) { - is ServerEvent.StateChanged -> { - reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - } - } - } - .launchIn(domain) - hypervisor.events - .onEach { event -> - when (event) { - is HypervisorEvent.SliceFinished -> reporter.reportHostSlice( - simulationContext.clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.hostServer, - scheduler.submittedVms, - scheduler.queuedVms, - scheduler.runningVms, - scheduler.finishedVms - ) - } - } - .launchIn(domain) + // Explicitly close the monitor to flush its buffer + reporter.close() } } -/** - * Process the trace. - */ -suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, reporter: Sc20Reporter, vmPlacements: Map = emptyMap()) { - val domain = simulationContext.domain - - try { - var submitted = 0L - val finished = Channel(Channel.CONFLATED) - val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) - - while (reader.hasNext()) { - val (time, workload) = reader.next() - - if (vmPlacements.isNotEmpty()) { - val vmId = workload.name.replace("VM Workload ", "") - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null) { - logger.warn { "Could not find placement data in VM placement file for VM $vmId" } - continue - } - val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false - if (machineInCluster) { - logger.info { "Ignored VM $vmId" } - continue - } - } - - submitted++ - delay(max(0, time - simulationContext.clock.millis())) - domain.launch { - chan.send(Unit) - val server = scheduler.deploy( - workload.image.name, workload.image, - Flavor(workload.image.maxCores, workload.image.requiredMemory) - ) - // Monitor server events - server.events - .onEach { - if (it is ServerEvent.StateChanged) { - reporter.reportVmStateChange(it.server) - } +sealed class Reporter(name: String) : OptionGroup(name) { + /** + * Create the [Sc20Reporter] for this option. + */ + abstract fun createReporter(): Sc20Reporter +} - delay(1) - finished.send(Unit) - } - .collect() - } - } +class Parquet : Reporter("Options for reporting using Parquet") { + private val path by option(help = "path to where the output should be stored") + .file() + .defaultLazy { File("data/results-${System.currentTimeMillis()}.parquet") } - while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) { - finished.receive() - } - } finally { - reader.close() - } + override fun createReporter(): Sc20Reporter = Sc20ParquetReporter(path) } /** * Main entry point of the experiment. */ -@OptIn(ExperimentalCoroutinesApi::class) -fun main(args: Array) { - val cli = ArgParser(args).parseInto(::ExperimentParameters) - logger.info("trace-directory: ${cli.traceDirectory}") - logger.info("environment-file: ${cli.environmentFile}") - logger.info("performance-interference-file: ${cli.performanceInterferenceFile}") - logger.info("selected-vms-file: ${cli.selectedVmsFile}") - logger.info("seed: ${cli.seed}") - logger.info("failures: ${cli.failures}") - logger.info("allocation-policy: ${cli.allocationPolicy}") - - val start = System.currentTimeMillis() - val reporter: Sc20Reporter = Sc20ParquetReporter(cli.outputFile) - - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") - - val chan = Channel(Channel.CONFLATED) - - val performanceInterferenceModel = try { - val performanceInterferenceStream = if (cli.performanceInterferenceFile != null) { - File(cli.performanceInterferenceFile!!).inputStream().buffered() - } else { - object {}.javaClass.getResourceAsStream("/env/performance-interference.json") - } - Sc20PerformanceInterferenceReader(performanceInterferenceStream) - .construct() - } catch (e: Throwable) { - reporter.close() - throw e - } - val vmPlacements = if (cli.vmPlacementFile == null) { - emptyMap() - } else { - Sc20VmPlacementReader(File(cli.vmPlacementFile!!).inputStream().buffered()).construct() - } - val environmentReader = Sc20ClusterEnvironmentReader(File(cli.environmentFile)) - val traceReader = try { - createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed) - } catch (e: Throwable) { - reporter.close() - throw e - } - val allocationPolicy = when (cli.allocationPolicy) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(cli.seed)) - "replay" -> ReplayAllocationPolicy(vmPlacements) - else -> throw IllegalArgumentException("Unknown allocation policy: ${cli.allocationPolicy}") - } - - root.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) - - val failureDomain = if (cli.failures) { - logger.info("ENABLING failures") - createFailureDomain(cli.seed, cli.failureInterval, bareMetalProvisioner, chan) - } else { - null - } - - attachMonitor(scheduler, reporter) - processTrace(traceReader, scheduler, chan, reporter, vmPlacements) - - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - logger.info("Simulation took ${System.currentTimeMillis() - start} milliseconds") - } - - runBlocking { - system.run() - system.terminate() - } - - // Explicitly close the monitor to flush its buffer - reporter.close() -} +fun main(args: Array) = ExperimentCommand().main(args) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt index fc3e98ae..eaac912a 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt @@ -12,14 +12,13 @@ import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.File import java.util.concurrent.ArrayBlockingQueue import kotlin.concurrent.thread private val logger = KotlinLogging.logger {} -class Sc20ParquetReporter( - destination: String -) : Sc20Reporter { +class Sc20ParquetReporter(destination: File) : Sc20Reporter { private val lastServerStates = mutableMapOf>() private val schema = SchemaBuilder .record("slice") @@ -43,7 +42,7 @@ class Sc20ParquetReporter( .name("totalRunningVms").type().longType().noDefault() .name("totalFinishedVms").type().longType().noDefault() .endRecord() - private val writer = AvroParquetWriter.builder(Path(destination)) + private val writer = AvroParquetWriter.builder(Path(destination.absolutePath)) .withSchema(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression -- cgit v1.2.3 From cbdbf818004040b60aa122dc6cb98ef635fa5ac1 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 7 May 2020 23:31:07 +0200 Subject: feat: Add initial version of Postgres reporter --- opendc/opendc-experiments-sc20/build.gradle.kts | 1 + opendc/opendc-experiments-sc20/schema.sql | 22 +++ .../opendc/experiments/sc20/Sc20Experiment.kt | 15 +- .../opendc/experiments/sc20/Sc20ParquetReporter.kt | 56 +++--- .../experiments/sc20/Sc20PostgresReporter.kt | 200 +++++++++++++++++++++ 5 files changed, 265 insertions(+), 29 deletions(-) create mode 100644 opendc/opendc-experiments-sc20/schema.sql create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index 42ccb1b1..6b6366a7 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -47,6 +47,7 @@ dependencies { exclude(group = "log4j") } runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") + runtimeOnly("org.postgresql:postgresql:42.2.12") runtimeOnly(project(":odcsim:odcsim-engine-omega")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") diff --git a/opendc/opendc-experiments-sc20/schema.sql b/opendc/opendc-experiments-sc20/schema.sql new file mode 100644 index 00000000..51990a75 --- /dev/null +++ b/opendc/opendc-experiments-sc20/schema.sql @@ -0,0 +1,22 @@ +DROP TABLE IF EXISTS host_reports; +CREATE TABLE host_reports ( + id BIGSERIAL PRIMARY KEY NOT NULL, + experiment_id BIGINT NOT NULL, + time BIGINT NOT NULL, + duration BIGINT NOT NULL, + requested_burst BIGINT NOT NULL, + granted_burst BIGINT NOT NULL, + overcommissioned_burst BIGINT NOT NULL, + interfered_burst BIGINT NOT NULL, + cpu_usage DOUBLE PRECISION NOT NULL, + cpu_demand DOUBLE PRECISION NOT NULL, + image_count INTEGER NOT NULL, + server TEXT NOT NULL, + host_state TEXT NOT NULL, + host_usage DOUBLE PRECISION NOT NULL, + power_draw DOUBLE PRECISION NOT NULL, + total_submitted_vms BIGINT NOT NULL, + total_queued_vms BIGINT NOT NULL, + total_running_vms BIGINT NOT NULL, + total_finished_vms BIGINT NOT NULL +); diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt index 4264ad3f..51448c9e 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt @@ -51,6 +51,7 @@ import com.github.ajalt.clikt.parameters.options.required import com.github.ajalt.clikt.parameters.types.choice import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.int +import com.github.ajalt.clikt.parameters.types.long import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch @@ -59,6 +60,7 @@ import mu.KotlinLogging import java.io.File import java.io.FileReader import java.io.InputStream +import java.sql.DriverManager import java.util.ServiceLoader import kotlin.random.Random @@ -114,7 +116,8 @@ class ExperimentCommand : CliktCommand(name = "sc20-experiment") { .required() private val reporter by option().groupChoice( - "parquet" to Parquet() + "parquet" to Parquet(), + "postgres" to Postgres() ).required() private fun parseVMs(string: String): List { @@ -216,6 +219,16 @@ class Parquet : Reporter("Options for reporting using Parquet") { override fun createReporter(): Sc20Reporter = Sc20ParquetReporter(path) } +class Postgres : Reporter("Options for reporting using PostgreSQL") { + private val url by option(help = "JDBC connection url").required() + private val experimentId by option(help = "Experiment ID").long().required() + + override fun createReporter(): Sc20Reporter { + val conn = DriverManager.getConnection(url) + return Sc20PostgresReporter(conn, experimentId) + } +} + /** * Main entry point of the experiment. */ diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt index eaac912a..f2139144 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt @@ -26,21 +26,21 @@ class Sc20ParquetReporter(destination: File) : Sc20Reporter { .fields() .name("time").type().longType().noDefault() .name("duration").type().longType().noDefault() - .name("requestedBurst").type().longType().noDefault() - .name("grantedBurst").type().longType().noDefault() - .name("overcommissionedBurst").type().longType().noDefault() - .name("interferedBurst").type().longType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("cpuDemand").type().doubleType().noDefault() - .name("numberOfDeployedImages").type().intType().noDefault() + .name("requested_burst").type().longType().noDefault() + .name("granted_burst").type().longType().noDefault() + .name("overcommissioned_burst").type().longType().noDefault() + .name("interfered_burst").type().longType().noDefault() + .name("cpu_usage").type().doubleType().noDefault() + .name("cpu_demand").type().doubleType().noDefault() + .name("image_count").type().intType().noDefault() .name("server").type().stringType().noDefault() - .name("hostState").type().stringType().noDefault() - .name("hostUsage").type().doubleType().noDefault() - .name("powerDraw").type().doubleType().noDefault() - .name("totalSubmittedVms").type().longType().noDefault() - .name("totalQueuedVms").type().longType().noDefault() - .name("totalRunningVms").type().longType().noDefault() - .name("totalFinishedVms").type().longType().noDefault() + .name("host_state").type().stringType().noDefault() + .name("host_usage").type().doubleType().noDefault() + .name("power_draw").type().doubleType().noDefault() + .name("total_submitted_vms").type().longType().noDefault() + .name("total_queued_vms").type().longType().noDefault() + .name("total_running_vms").type().longType().noDefault() + .name("total_finished_vms").type().longType().noDefault() .endRecord() private val writer = AvroParquetWriter.builder(Path(destination.absolutePath)) .withSchema(schema) @@ -122,21 +122,21 @@ class Sc20ParquetReporter(destination: File) : Sc20Reporter { val record = GenericData.Record(schema) record.put("time", time) record.put("duration", duration) - record.put("requestedBurst", requestedBurst) - record.put("grantedBurst", grantedBurst) - record.put("overcommissionedBurst", overcommissionedBurst) - record.put("interferedBurst", interferedBurst) - record.put("cpuUsage", cpuUsage) - record.put("cpuDemand", cpuDemand) - record.put("numberOfDeployedImages", numberOfDeployedImages) + record.put("requested_burst", requestedBurst) + record.put("granted_burst", grantedBurst) + record.put("overcommissioned_burst", overcommissionedBurst) + record.put("interfered_burst", interferedBurst) + record.put("cpu_usage", cpuUsage) + record.put("cpu_demand", cpuDemand) + record.put("image_count", numberOfDeployedImages) record.put("server", hostServer.uid) - record.put("hostState", hostServer.state) - record.put("hostUsage", usage) - record.put("powerDraw", powerDraw) - record.put("totalSubmittedVms", submittedVms) - record.put("totalQueuedVms", queuedVms) - record.put("totalRunningVms", runningVms) - record.put("totalFinishedVms", finishedVms) + record.put("host_state", hostServer.state) + record.put("host_usage", usage) + record.put("power_draw", powerDraw) + record.put("total_submitted_vms", submittedVms) + record.put("total_queued_vms", queuedVms) + record.put("total_running_vms", runningVms) + record.put("total_finished_vms", finishedVms) queue.put(record) } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt new file mode 100644 index 00000000..5c5e6ceb --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt @@ -0,0 +1,200 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import kotlinx.coroutines.flow.first +import mu.KotlinLogging +import java.sql.Connection +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.concurrent.thread + +private val logger = KotlinLogging.logger {} + +class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20Reporter { + private val lastServerStates = mutableMapOf>() + private val queue = ArrayBlockingQueue(2048) + private val stop = AtomicBoolean(false) + private val writerThread = thread(start = true, name = "sc20-writer") { + val stmt = try { + conn.prepareStatement( + """ + INSERT INTO host_reports (experiment_id, time, duration, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, image_count, server, host_state, host_usage, power_draw, total_submitted_vms, total_queued_vms, total_running_vms, total_finished_vms) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """.trimIndent() + ) + } catch (e: Throwable) { + conn.close() + throw e + } + + val batchSize = 4096 + var batch = 0 + + try { + while (!stop.get()) { + val record = queue.take() + stmt.setLong(1, experimentId) + stmt.setLong(2, record.time) + stmt.setLong(3, record.duration) + stmt.setLong(4, record.requestedBurst) + stmt.setLong(5, record.grantedBurst) + stmt.setLong(6, record.overcommissionedBurst) + stmt.setLong(7, record.interferedBurst) + stmt.setDouble(8, record.cpuUsage) + stmt.setDouble(9, record.cpuDemand) + stmt.setInt(10, record.numberOfDeployedImages) + stmt.setString(11, record.hostServer.uid.toString()) + stmt.setString(12, record.hostServer.state.name) + stmt.setDouble(13, record.hostUsage) + stmt.setDouble(14, record.powerDraw) + stmt.setLong(15, record.submittedVms) + stmt.setLong(16, record.queuedVms) + stmt.setLong(17, record.runningVms) + stmt.setLong(18, record.finishedVms) + stmt.addBatch() + batch++ + + if (batch > batchSize) { + stmt.executeBatch() + batch = 0 + } + } + } finally { + stmt.executeBatch() + stmt.close() + conn.close() + } + } + + override suspend fun reportVmStateChange(server: Server) {} + + override suspend fun reportHostStateChange( + driver: VirtDriver, + server: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long + ) { + val lastServerState = lastServerStates[server] + if (server.state == ServerState.SHUTOFF && lastServerState != null) { + val duration = simulationContext.clock.millis() - lastServerState.second + reportHostSlice( + simulationContext.clock.millis(), + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server, + submittedVms, + queuedVms, + runningVms, + finishedVms, + duration + ) + } + + logger.info("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]") + + lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) + } + + override suspend fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, + duration: Long + ) { + // Assume for now that the host is not virtualized and measure the current power draw + val driver = hostServer.services[BareMetalDriver.Key] + val usage = driver.usage.first() + val powerDraw = driver.powerDraw.first() + + queue.put( + Report( + time, + duration, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + numberOfDeployedImages, + hostServer, + usage, + powerDraw, + submittedVms, + queuedVms, + runningVms, + finishedVms + ) + ) + } + + override fun close() { + // Busy loop to wait for writer thread to finish + stop.set(true) + writerThread.join() + } + + data class Report( + val time: Long, + val duration: Long, + val requestedBurst: Long, + val grantedBurst: Long, + val overcommissionedBurst: Long, + val interferedBurst: Long, + val cpuUsage: Double, + val cpuDemand: Double, + val numberOfDeployedImages: Int, + val hostServer: Server, + val hostUsage: Double, + val powerDraw: Double, + val submittedVms: Long, + val queuedVms: Long, + val runningVms: Long, + val finishedVms: Long + ) +} -- cgit v1.2.3