diff options
Diffstat (limited to 'opendc')
8 files changed, 89 insertions, 56 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt index 61b5759d..ac43b6ac 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt @@ -31,6 +31,7 @@ import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver @@ -143,17 +144,19 @@ suspend fun createProvisioner( @OptIn(ExperimentalCoroutinesApi::class) suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: ExperimentReporter) { val domain = simulationContext.domain + val clock = simulationContext.clock val hypervisors = scheduler.drivers() // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + reporter.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) hypervisor.server.events .onEach { event -> + val time = clock.millis() when (event) { is ServerEvent.StateChanged -> { - reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + reporter.reportHostStateChange(time, hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) } } } @@ -179,6 +182,11 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex } } .launchIn(domain) + + val driver = hypervisor.server.services[BareMetalDriver.Key] + driver.powerDraw + .onEach { reporter.reportPowerConsumption(hypervisor.server, it) } + .launchIn(domain) } } @@ -222,8 +230,10 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP // Monitor server events server.events .onEach { + val time = simulationContext.clock.millis() + if (it is ServerEvent.StateChanged) { - reporter.reportVmStateChange(it.server) + reporter.reportVmStateChange(time, it.server) } delay(1) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt index 5e16b5e6..9455cb9d 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt @@ -133,7 +133,7 @@ public class ExperimentRunner( performanceInterferenceModel: PerformanceInterferenceModel?, run: Run ): TraceReader<VmWorkload> { - val raw = rawTraceReaders.getOrPut(name) { Sc20RawParquetTraceReader(File(tracePath, name)) } + val raw = rawTraceReaders.getValue(name) return Sc20ParquetTraceReader( raw, performanceInterferenceModel, @@ -149,20 +149,6 @@ public class ExperimentRunner( } /** - * Run the specified run. - */ - private fun run(run: Run) { - val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id) - val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run) - val environmentReader = createEnvironmentReader(run.scenario.topology.name) - try { - run.scenario(run, reporter, environmentReader, traceReader) - } finally { - reporter.close() - } - } - - /** * Run the portfolios. */ @OptIn(ExperimentalStdlibApi::class) @@ -179,6 +165,12 @@ public class ExperimentRunner( val mainDispatcher = coroutineContext[CoroutineDispatcher.Key]!! for (run in plan) { val scenarioId = scenarioIds[run.scenario]!! + + rawTraceReaders.computeIfAbsent(run.scenario.workload.name) { name -> + logger.info { "Loading trace $name" } + Sc20RawParquetTraceReader(File(tracePath, name)) + } + launch(dispatcher) { launch(mainDispatcher) { helper.startRun(scenarioId, run.id) @@ -189,7 +181,15 @@ public class ExperimentRunner( try { val duration = measureTimeMillis { - run(run) + val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id) + val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run) + val environmentReader = createEnvironmentReader(run.scenario.topology.name) + + try { + run.scenario(run, reporter, environmentReader, traceReader) + } finally { + reporter.close() + } } finished.incrementAndGet() diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt index f2ac84a1..963d47e9 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt @@ -72,7 +72,7 @@ object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") { // Workload("solvinity", 0.1), // Workload("solvinity", 0.25), // Workload("small-parquet", 0.5), - Workload("small-parquet", 1.0) + Workload("full-traces", 0.10) ) override val operationalPhenomena = listOf( diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt index 6089271e..bb3466ba 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt @@ -26,8 +26,11 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.format.trace.TraceEntry +import mu.KotlinLogging import kotlin.random.Random +private val logger = KotlinLogging.logger {} + /** * Sample the workload for the specified [run]. */ @@ -39,7 +42,8 @@ fun sampleWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<TraceEnt * Sample a regular (non-HPC) workload. */ fun sampleRegularWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<TraceEntry<VmWorkload>> { - if (run.scenario.workload.fraction >= 1) { + val fraction = run.scenario.workload.fraction + if (fraction >= 1) { return trace } @@ -50,7 +54,7 @@ fun sampleRegularWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<T for (entry in shuffled) { val entryLoad = entry.workload.image.tags.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > run.scenario.workload.fraction) { + if ((currentLoad + entryLoad) / totalLoad > fraction) { break } @@ -58,5 +62,8 @@ fun sampleRegularWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<T res += entry } + logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt index 0403a3b5..1c752cb1 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt @@ -35,12 +35,13 @@ interface ExperimentReporter : Closeable { /** * This method is invoked when the state of a VM changes. */ - suspend fun reportVmStateChange(server: Server) {} + fun reportVmStateChange(time: Long, server: Server) {} /** * This method is invoked when the state of a host changes. */ - suspend fun reportHostStateChange( + fun reportHostStateChange( + time: Long, driver: VirtDriver, server: Server, submittedVms: Long, @@ -50,9 +51,14 @@ interface ExperimentReporter : Closeable { ) {} /** + * Report the power consumption of a host. + */ + fun reportPowerConsumption(host: Server, draw: Double) + + /** * This method is invoked for a host for each slice that is finishes. */ - suspend fun reportHostSlice( + fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt index 6b3351d4..58d384e7 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt @@ -27,9 +27,7 @@ package com.atlarge.opendc.experiments.sc20.reporter import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver -import kotlinx.coroutines.flow.first import mu.KotlinLogging import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData @@ -87,9 +85,10 @@ class ExperimentParquetReporter(destination: File) : } } - override suspend fun reportVmStateChange(server: Server) {} + override fun reportVmStateChange(time: Long, server: Server) {} - override suspend fun reportHostStateChange( + override fun reportHostStateChange( + time: Long, driver: VirtDriver, server: Server, submittedVms: Long, @@ -97,11 +96,13 @@ class ExperimentParquetReporter(destination: File) : runningVms: Long, finishedVms: Long ) { + logger.info("Host ${server.uid} changed state ${server.state} [$time]") + val lastServerState = lastServerStates[server] if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = simulationContext.clock.millis() - lastServerState.second + val duration = time - lastServerState.second reportHostSlice( - simulationContext.clock.millis(), + time, 0, 0, 0, @@ -116,14 +117,21 @@ class ExperimentParquetReporter(destination: File) : finishedVms, duration ) + + lastServerStates.remove(server) + lastPowerConsumption.remove(server) + } else { + lastServerStates[server] = Pair(server.state, time) } + } - logger.info("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]") + private val lastPowerConsumption = mutableMapOf<Server, Double>() - lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) + override fun reportPowerConsumption(host: Server, draw: Double) { + lastPowerConsumption[host] = draw } - override suspend fun reportHostSlice( + override fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, @@ -139,11 +147,6 @@ class ExperimentParquetReporter(destination: File) : finishedVms: Long, duration: Long ) { - // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.services[BareMetalDriver.Key] - val usage = driver.usage.first() - val powerDraw = driver.powerDraw.first() - val record = GenericData.Record(schema) record.put("time", time) record.put("duration", duration) @@ -156,8 +159,8 @@ class ExperimentParquetReporter(destination: File) : record.put("image_count", numberOfDeployedImages) record.put("server", hostServer.uid) record.put("host_state", hostServer.state) - record.put("host_usage", usage) - record.put("power_draw", powerDraw) + record.put("host_usage", cpuUsage) + record.put("power_draw", lastPowerConsumption[hostServer] ?: 200.0) record.put("total_submitted_vms", submittedVms) record.put("total_queued_vms", queuedVms) record.put("total_running_vms", runningVms) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt index 5de3535d..a92278d8 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt @@ -27,9 +27,7 @@ package com.atlarge.opendc.experiments.sc20.reporter import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver -import kotlinx.coroutines.flow.first import mu.KotlinLogging private val logger = KotlinLogging.logger {} @@ -37,9 +35,10 @@ private val logger = KotlinLogging.logger {} class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: PostgresHostMetricsWriter) : ExperimentReporter { private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() - override suspend fun reportVmStateChange(server: Server) {} + override fun reportVmStateChange(time: Long, server: Server) {} - override suspend fun reportHostStateChange( + override fun reportHostStateChange( + time: Long, driver: VirtDriver, server: Server, submittedVms: Long, @@ -48,10 +47,12 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P finishedVms: Long ) { val lastServerState = lastServerStates[server] + logger.debug("Host ${server.uid} changed state ${server.state} [$time]") + if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = simulationContext.clock.millis() - lastServerState.second + val duration = time - lastServerState.second reportHostSlice( - simulationContext.clock.millis(), + time, 0, 0, 0, @@ -66,14 +67,23 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P finishedVms, duration ) + + lastServerStates.remove(server) + lastPowerConsumption.remove(server) + } else { + lastServerStates[server] = Pair(server.state, time) } + } + - logger.debug("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]") + private val lastPowerConsumption = mutableMapOf<Server, Double>() - lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) + override fun reportPowerConsumption(host: Server, draw: Double) { + lastPowerConsumption[host] = draw } - override suspend fun reportHostSlice( + + override fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, @@ -89,10 +99,6 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P finishedVms: Long, duration: Long ) { - // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.services[BareMetalDriver.Key] - val powerDraw = driver.powerDraw.first() - writer.write( scenario, run, HostMetrics( time, @@ -105,7 +111,7 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P interferedBurst, cpuUsage, cpuDemand, - powerDraw + lastPowerConsumption[hostServer] ?: 200.0 ) ) } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index f19c9275..485c2922 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -94,7 +94,6 @@ class Sc20RawParquetTraceReader(private val path: File) { var counter = 0 val entries = mutableListOf<TraceEntryImpl>() - val loadCache = mutableListOf<LoadCacheEntry>() return try { while (true) { @@ -106,6 +105,8 @@ class Sc20RawParquetTraceReader(private val path: File) { val requiredMemory = record["requiredMemory"] as Long val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) + logger.info { "VM $id" } + val vmFragments = fragments.getValue(id).asSequence() val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs val vmWorkload = VmWorkload( |
