From 880c0783d7e20d9b082227a5cea685bfd76e4920 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 14 May 2020 00:45:37 +0200 Subject: perf: Various performance improvements --- .../opendc/experiments/sc20/ExperimentHelpers.kt | 16 ++++++++-- .../opendc/experiments/sc20/ExperimentRunner.kt | 32 ++++++++++---------- .../atlarge/opendc/experiments/sc20/Portfolios.kt | 2 +- .../opendc/experiments/sc20/WorkloadSampler.kt | 11 +++++-- .../sc20/reporter/ExperimentReporter.kt | 12 ++++++-- .../sc20/reporter/ParquetExperimentReporter.kt | 35 ++++++++++++---------- .../sc20/reporter/PostgresExperimentReporter.kt | 34 ++++++++++++--------- .../sc20/trace/Sc20RawParquetTraceReader.kt | 3 +- 8 files changed, 89 insertions(+), 56 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt index 61b5759d..ac43b6ac 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt @@ -31,6 +31,7 @@ import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver @@ -143,17 +144,19 @@ suspend fun createProvisioner( @OptIn(ExperimentalCoroutinesApi::class) suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: ExperimentReporter) { val domain = simulationContext.domain + val clock = simulationContext.clock val hypervisors = scheduler.drivers() // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + reporter.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) hypervisor.server.events .onEach { event -> + val time = clock.millis() when (event) { is ServerEvent.StateChanged -> { - reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + reporter.reportHostStateChange(time, hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) } } } @@ -179,6 +182,11 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex } } .launchIn(domain) + + val driver = hypervisor.server.services[BareMetalDriver.Key] + driver.powerDraw + .onEach { reporter.reportPowerConsumption(hypervisor.server, it) } + .launchIn(domain) } } @@ -222,8 +230,10 @@ suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtP // Monitor server events server.events .onEach { + val time = simulationContext.clock.millis() + if (it is ServerEvent.StateChanged) { - reporter.reportVmStateChange(it.server) + reporter.reportVmStateChange(time, it.server) } delay(1) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt index 5e16b5e6..9455cb9d 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt @@ -133,7 +133,7 @@ public class ExperimentRunner( performanceInterferenceModel: PerformanceInterferenceModel?, run: Run ): TraceReader { - val raw = rawTraceReaders.getOrPut(name) { Sc20RawParquetTraceReader(File(tracePath, name)) } + val raw = rawTraceReaders.getValue(name) return Sc20ParquetTraceReader( raw, performanceInterferenceModel, @@ -148,20 +148,6 @@ public class ExperimentRunner( return Sc20ClusterEnvironmentReader(File(environmentPath, "$name.txt")) } - /** - * Run the specified run. - */ - private fun run(run: Run) { - val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id) - val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run) - val environmentReader = createEnvironmentReader(run.scenario.topology.name) - try { - run.scenario(run, reporter, environmentReader, traceReader) - } finally { - reporter.close() - } - } - /** * Run the portfolios. */ @@ -179,6 +165,12 @@ public class ExperimentRunner( val mainDispatcher = coroutineContext[CoroutineDispatcher.Key]!! for (run in plan) { val scenarioId = scenarioIds[run.scenario]!! + + rawTraceReaders.computeIfAbsent(run.scenario.workload.name) { name -> + logger.info { "Loading trace $name" } + Sc20RawParquetTraceReader(File(tracePath, name)) + } + launch(dispatcher) { launch(mainDispatcher) { helper.startRun(scenarioId, run.id) @@ -189,7 +181,15 @@ public class ExperimentRunner( try { val duration = measureTimeMillis { - run(run) + val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id) + val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run) + val environmentReader = createEnvironmentReader(run.scenario.topology.name) + + try { + run.scenario(run, reporter, environmentReader, traceReader) + } finally { + reporter.close() + } } finished.incrementAndGet() diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt index f2ac84a1..963d47e9 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt @@ -72,7 +72,7 @@ object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") { // Workload("solvinity", 0.1), // Workload("solvinity", 0.25), // Workload("small-parquet", 0.5), - Workload("small-parquet", 1.0) + Workload("full-traces", 0.10) ) override val operationalPhenomena = listOf( diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt index 6089271e..bb3466ba 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt @@ -26,8 +26,11 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.format.trace.TraceEntry +import mu.KotlinLogging import kotlin.random.Random +private val logger = KotlinLogging.logger {} + /** * Sample the workload for the specified [run]. */ @@ -39,7 +42,8 @@ fun sampleWorkload(trace: List>, run: Run): List>, run: Run): List> { - if (run.scenario.workload.fraction >= 1) { + val fraction = run.scenario.workload.fraction + if (fraction >= 1) { return trace } @@ -50,7 +54,7 @@ fun sampleRegularWorkload(trace: List>, run: Run): List run.scenario.workload.fraction) { + if ((currentLoad + entryLoad) / totalLoad > fraction) { break } @@ -58,5 +62,8 @@ fun sampleRegularWorkload(trace: List>, run: Run): List() - lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) + override fun reportPowerConsumption(host: Server, draw: Double) { + lastPowerConsumption[host] = draw } - override suspend fun reportHostSlice( + override fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, @@ -139,11 +147,6 @@ class ExperimentParquetReporter(destination: File) : finishedVms: Long, duration: Long ) { - // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.services[BareMetalDriver.Key] - val usage = driver.usage.first() - val powerDraw = driver.powerDraw.first() - val record = GenericData.Record(schema) record.put("time", time) record.put("duration", duration) @@ -156,8 +159,8 @@ class ExperimentParquetReporter(destination: File) : record.put("image_count", numberOfDeployedImages) record.put("server", hostServer.uid) record.put("host_state", hostServer.state) - record.put("host_usage", usage) - record.put("power_draw", powerDraw) + record.put("host_usage", cpuUsage) + record.put("power_draw", lastPowerConsumption[hostServer] ?: 200.0) record.put("total_submitted_vms", submittedVms) record.put("total_queued_vms", queuedVms) record.put("total_running_vms", runningVms) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt index 5de3535d..a92278d8 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt @@ -27,9 +27,7 @@ package com.atlarge.opendc.experiments.sc20.reporter import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver -import kotlinx.coroutines.flow.first import mu.KotlinLogging private val logger = KotlinLogging.logger {} @@ -37,9 +35,10 @@ private val logger = KotlinLogging.logger {} class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: PostgresHostMetricsWriter) : ExperimentReporter { private val lastServerStates = mutableMapOf>() - override suspend fun reportVmStateChange(server: Server) {} + override fun reportVmStateChange(time: Long, server: Server) {} - override suspend fun reportHostStateChange( + override fun reportHostStateChange( + time: Long, driver: VirtDriver, server: Server, submittedVms: Long, @@ -48,10 +47,12 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P finishedVms: Long ) { val lastServerState = lastServerStates[server] + logger.debug("Host ${server.uid} changed state ${server.state} [$time]") + if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = simulationContext.clock.millis() - lastServerState.second + val duration = time - lastServerState.second reportHostSlice( - simulationContext.clock.millis(), + time, 0, 0, 0, @@ -66,14 +67,23 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P finishedVms, duration ) + + lastServerStates.remove(server) + lastPowerConsumption.remove(server) + } else { + lastServerStates[server] = Pair(server.state, time) } + } + - logger.debug("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]") + private val lastPowerConsumption = mutableMapOf() - lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) + override fun reportPowerConsumption(host: Server, draw: Double) { + lastPowerConsumption[host] = draw } - override suspend fun reportHostSlice( + + override fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, @@ -89,10 +99,6 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P finishedVms: Long, duration: Long ) { - // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.services[BareMetalDriver.Key] - val powerDraw = driver.powerDraw.first() - writer.write( scenario, run, HostMetrics( time, @@ -105,7 +111,7 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P interferedBurst, cpuUsage, cpuDemand, - powerDraw + lastPowerConsumption[hostServer] ?: 200.0 ) ) } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index f19c9275..485c2922 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -94,7 +94,6 @@ class Sc20RawParquetTraceReader(private val path: File) { var counter = 0 val entries = mutableListOf() - val loadCache = mutableListOf() return try { while (true) { @@ -106,6 +105,8 @@ class Sc20RawParquetTraceReader(private val path: File) { val requiredMemory = record["requiredMemory"] as Long val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) + logger.info { "VM $id" } + val vmFragments = fragments.getValue(id).asSequence() val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs val vmWorkload = VmWorkload( -- cgit v1.2.3