From f4fe224194c0bcabda2e17005077e76ea9e7098c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 7 May 2020 22:15:59 +0200 Subject: refactor: Migrate to Clikt for argument parsing This change migrates the experiment runner to Clikt for parsing command line arguments, since it provides functionality that we need for having multiple types of reporters (e.g. postgres and parquet), not offered by the current parser. --- opendc/opendc-experiments-sc20/build.gradle.kts | 3 +- .../opendc/experiments/sc20/ExperimentHelpers.kt | 235 +++++++++++ .../opendc/experiments/sc20/Sc20Experiment.kt | 442 +++++++-------------- .../opendc/experiments/sc20/Sc20ParquetReporter.kt | 7 +- 4 files changed, 380 insertions(+), 307 deletions(-) create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index 76ec7cc4..42ccb1b1 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -39,8 +39,7 @@ dependencies { api(project(":opendc:opendc-core")) implementation(project(":opendc:opendc-format")) implementation(kotlin("stdlib")) - implementation("com.xenomachina:kotlin-argparser:2.0.7") - implementation("org.slf4j:slf4j-api:${Library.SLF4J}") + implementation("com.github.ajalt:clikt:2.6.0") implementation("io.github.microutils:kotlin-logging:1.7.9") implementation("org.apache.parquet:parquet-avro:1.11.0") implementation("org.apache.hadoop:hadoop-client:3.2.1") { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt new file mode 100644 index 00000000..e37dea8b --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt @@ -0,0 +1,235 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorEvent +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver +import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import com.atlarge.opendc.core.failure.CorrelatedFaultInjector +import com.atlarge.opendc.core.failure.FailureDomain +import com.atlarge.opendc.core.failure.FaultInjector +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import mu.KotlinLogging +import java.io.File +import java.util.TreeSet +import kotlin.math.ln +import kotlin.math.max +import kotlin.random.Random + +/** + * The logger for this experiment. + */ +private val logger = KotlinLogging.logger {} + +/** + * Construct the failure domain for the experiments. + */ +suspend fun createFailureDomain( + seed: Int, + failureInterval: Int, + bareMetalProvisioner: ProvisioningService, + chan: Channel +): Domain { + val root = simulationContext.domain + val domain = root.newDomain(name = "failures") + domain.launch { + chan.receive() + val random = Random(seed) + val injectors = mutableMapOf() + for (node in bareMetalProvisioner.nodes()) { + val cluster = node.metadata[NODE_CLUSTER] as String + val injector = + injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) } + injector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + return domain +} + +/** + * Obtain the [FaultInjector] to use for the experiments. + */ +fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): FaultInjector { + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return CorrelatedFaultInjector( + domain, + iatScale = ln(failureInterval.toDouble()), iatShape = 1.03, // Hours + sizeScale = 1.88, sizeShape = 1.25, + dScale = 9.51, dShape = 3.21, // Minutes + random = random + ) +} + +/** + * Create the trace reader from which the VM workloads are read. + */ +fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List, seed: Int): Sc20ParquetTraceReader { + return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) +} + +/** + * Construct the environment for a VM provisioner and return the provisioner instance. + */ +suspend fun createProvisioner( + root: Domain, + environmentReader: EnvironmentReader, + allocationPolicy: AllocationPolicy +): Pair = withContext(root.coroutineContext) { + val environment = environmentReader.use { it.construct(root) } + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] + + // Wait for the bare metal nodes to be spawned + delay(10) + + val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) + + // Wait for the hypervisors to be spawned + delay(10) + + bareMetalProvisioner to scheduler +} + +/** + * Attach the specified monitor to the VM provisioner. + */ +@OptIn(ExperimentalCoroutinesApi::class) +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) { + val domain = simulationContext.domain + val hypervisors = scheduler.drivers() + + // Monitor hypervisor events + for (hypervisor in hypervisors) { + // TODO Do not expose VirtDriver directly but use Hypervisor class. + reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + hypervisor.server.events + .onEach { event -> + when (event) { + is ServerEvent.StateChanged -> { + reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + } + } + } + .launchIn(domain) + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> reporter.reportHostSlice( + simulationContext.clock.millis(), + event.requestedBurst, + event.grantedBurst, + event.overcommissionedBurst, + event.interferedBurst, + event.cpuUsage, + event.cpuDemand, + event.numberOfDeployedImages, + event.hostServer, + scheduler.submittedVms, + scheduler.queuedVms, + scheduler.runningVms, + scheduler.finishedVms + ) + } + } + .launchIn(domain) + } +} + +/** + * Process the trace. + */ +suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, reporter: Sc20Reporter, vmPlacements: Map = emptyMap()) { + val domain = simulationContext.domain + + try { + var submitted = 0L + val finished = Channel(Channel.CONFLATED) + val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) + + while (reader.hasNext()) { + val (time, workload) = reader.next() + + if (vmPlacements.isNotEmpty()) { + val vmId = workload.name.replace("VM Workload ", "") + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null) { + logger.warn { "Could not find placement data in VM placement file for VM $vmId" } + continue + } + val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false + if (machineInCluster) { + logger.info { "Ignored VM $vmId" } + continue + } + } + + submitted++ + delay(max(0, time - simulationContext.clock.millis())) + domain.launch { + chan.send(Unit) + val server = scheduler.deploy( + workload.image.name, workload.image, + Flavor(workload.image.maxCores, workload.image.requiredMemory) + ) + // Monitor server events + server.events + .onEach { + if (it is ServerEvent.StateChanged) { + reporter.reportVmStateChange(it.server) + } + + delay(1) + finished.send(Unit) + } + .collect() + } + } + + while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) { + finished.receive() + } + } finally { + reader.close() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt index 2029d3e7..4264ad3f 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt @@ -24,85 +24,98 @@ package com.atlarge.opendc.experiments.sc20 -import com.atlarge.odcsim.Domain import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.ServerEvent -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.compute.metal.NODE_CLUSTER -import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.compute.virt.HypervisorEvent -import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver -import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService -import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy -import com.atlarge.opendc.core.failure.CorrelatedFaultInjector -import com.atlarge.opendc.core.failure.FailureDomain -import com.atlarge.opendc.core.failure.FaultInjector -import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import com.atlarge.opendc.format.trace.TraceReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import com.xenomachina.argparser.ArgParser -import com.xenomachina.argparser.default -import kotlinx.coroutines.ExperimentalCoroutinesApi +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.groups.OptionGroup +import com.github.ajalt.clikt.parameters.groups.default +import com.github.ajalt.clikt.parameters.groups.groupChoice +import com.github.ajalt.clikt.parameters.groups.mutuallyExclusiveOptions +import com.github.ajalt.clikt.parameters.groups.required +import com.github.ajalt.clikt.parameters.options.convert +import com.github.ajalt.clikt.parameters.options.default +import com.github.ajalt.clikt.parameters.options.defaultLazy +import com.github.ajalt.clikt.parameters.options.flag +import com.github.ajalt.clikt.parameters.options.option +import com.github.ajalt.clikt.parameters.options.required +import com.github.ajalt.clikt.parameters.types.choice +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.int import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext import mu.KotlinLogging import java.io.File import java.io.FileReader +import java.io.InputStream import java.util.ServiceLoader -import java.util.TreeSet -import kotlin.math.ln -import kotlin.math.max import kotlin.random.Random +/** + * The logger for this experiment. + */ private val logger = KotlinLogging.logger {} -class ExperimentParameters(parser: ArgParser) { - val traceDirectory by parser.storing("path to the trace directory") - val environmentFile by parser.storing("path to the environment file") - val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null } - val vmPlacementFile by parser.storing("path to the VM placement file").default { null } - val outputFile by parser.storing("path to where the output should be stored") - .default { "data/results-${System.currentTimeMillis()}.parquet" } - val selectedVms by parser.storing("the VMs to run") { parseVMs(this) } - .default { emptyList() } - val selectedVmsFile by parser.storing("path to a file containing the VMs to run") { - parseVMs(FileReader(File(this)).readText()) - } - .default { emptyList() } - val seed by parser.storing("the random seed") { toInt() } +/** + * Represents the command for running the experiment. + */ +class ExperimentCommand : CliktCommand(name = "sc20-experiment") { + private val environment by option("--environment-file", help = "path to the environment file") + .file() + .required() + private val performanceInterferenceStream by option("--performance-interference-file", help = "path to the performance interference file") + .file() + .convert { it.inputStream() as InputStream } + .defaultLazy { ExperimentCommand::class.java.getResourceAsStream("/env/performance-interference.json") } + + private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file") + .file() + .convert { + Sc20VmPlacementReader(it.inputStream().buffered()).construct() + } + .default(emptyMap()) + + private val selectedVms by mutuallyExclusiveOptions( + option("--selected-vms", help = "the VMs to run").convert { parseVMs(it) }, + option("--selected-vms-file").file().convert { parseVMs(FileReader(it).readText()) } + ).default(emptyList()) + + private val seed by option(help = "the random seed") + .int() .default(0) - val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures") - val failureInterval by parser.storing("expected number of hours between failures") { toInt() } + private val failures by option("-x", "--failures", help = "enable (correlated) machine failures") + .flag() + private val failureInterval by option(help = "expected number of hours between failures") + .int() .default(24 * 7) // one week - val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem") - - fun getSelectedVmList(): List { - return if (selectedVms.isEmpty()) { - selectedVmsFile - } else { - selectedVms - } - } + private val allocationPolicy by option(help = "name of VM allocation policy to use") + .choice( + "mem", "mem-inv", + "core-mem", "core-mem-inv", + "active-servers", "active-servers-inv", + "provisioned-cores", "provisioned-cores-inv", + "random", "replay" + ) + .default("core-mem") + + private val trace by option("--trace-directory", help = "path to the trace directory") + .file(canBeFile = false) + .required() + + private val reporter by option().groupChoice( + "parquet" to Parquet() + ).required() private fun parseVMs(string: String): List { // Handle case where VM list contains a VM name with an (escaped) single-quote in it @@ -112,271 +125,98 @@ class ExperimentParameters(parser: ArgParser) { val vms: List = jacksonObjectMapper().readValue(sanitizedString) return vms } -} -/** - * Construct the failure domain for the experiments. - */ -suspend fun createFailureDomain( - seed: Int, - failureInterval: Int, - bareMetalProvisioner: ProvisioningService, - chan: Channel -): Domain { - val root = simulationContext.domain - val domain = root.newDomain(name = "failures") - domain.launch { - chan.receive() - val random = Random(seed) - val injectors = mutableMapOf() - for (node in bareMetalProvisioner.nodes()) { - val cluster = node.metadata[NODE_CLUSTER] as String - val injector = - injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) } - injector.enqueue(node.metadata["driver"] as FailureDomain) + override fun run() { + logger.info("seed: $seed") + logger.info("failures: $failures") + logger.info("allocation-policy: $allocationPolicy") + + val start = System.currentTimeMillis() + val reporter: Sc20Reporter = reporter.createReporter() + + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + val system = provider("test") + val root = system.newDomain("root") + + val chan = Channel(Channel.CONFLATED) + val allocationPolicy = when (this.allocationPolicy) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seed)) + "replay" -> ReplayAllocationPolicy(vmPlacements) + else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}") } - } - return domain -} -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): FaultInjector { - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return CorrelatedFaultInjector( - domain, - iatScale = ln(failureInterval.toDouble()), iatShape = 1.03, // Hours - sizeScale = 1.88, sizeShape = 1.25, - dScale = 9.51, dShape = 3.21, // Minutes - random = random - ) -} - -/** - * Create the trace reader from which the VM workloads are read. - */ -fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List, seed: Int): Sc20ParquetTraceReader { - return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) -} + val performanceInterferenceModel = try { + Sc20PerformanceInterferenceReader(performanceInterferenceStream).construct() + } catch (e: Throwable) { + reporter.close() + throw e + } + val environmentReader = Sc20ClusterEnvironmentReader(environment) + val traceReader = try { + createTraceReader(trace, performanceInterferenceModel, selectedVms, seed) + } catch (e: Throwable) { + reporter.close() + throw e + } -/** - * Construct the environment for a VM provisioner and return the provisioner instance. - */ -suspend fun createProvisioner( - root: Domain, - environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy -): Pair = withContext(root.coroutineContext) { - val environment = environmentReader.use { it.construct(root) } - val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) - // Wait for the bare metal nodes to be spawned - delay(10) + val failureDomain = if (failures) { + logger.info("ENABLING failures") + createFailureDomain(seed, failureInterval, bareMetalProvisioner, chan) + } else { + null + } - val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) + attachMonitor(scheduler, reporter) + processTrace(traceReader, scheduler, chan, reporter, vmPlacements) - // Wait for the hypervisors to be spawned - delay(10) + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") - bareMetalProvisioner to scheduler -} + failureDomain?.cancel() + scheduler.terminate() + logger.info("Simulation took ${System.currentTimeMillis() - start} milliseconds") + } -/** - * Attach the specified monitor to the VM provisioner. - */ -@OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) { - val domain = simulationContext.domain - val hypervisors = scheduler.drivers() + runBlocking { + system.run() + system.terminate() + } - // Monitor hypervisor events - for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - hypervisor.server.events - .onEach { event -> - when (event) { - is ServerEvent.StateChanged -> { - reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - } - } - } - .launchIn(domain) - hypervisor.events - .onEach { event -> - when (event) { - is HypervisorEvent.SliceFinished -> reporter.reportHostSlice( - simulationContext.clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.hostServer, - scheduler.submittedVms, - scheduler.queuedVms, - scheduler.runningVms, - scheduler.finishedVms - ) - } - } - .launchIn(domain) + // Explicitly close the monitor to flush its buffer + reporter.close() } } -/** - * Process the trace. - */ -suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, reporter: Sc20Reporter, vmPlacements: Map = emptyMap()) { - val domain = simulationContext.domain - - try { - var submitted = 0L - val finished = Channel(Channel.CONFLATED) - val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) - - while (reader.hasNext()) { - val (time, workload) = reader.next() - - if (vmPlacements.isNotEmpty()) { - val vmId = workload.name.replace("VM Workload ", "") - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null) { - logger.warn { "Could not find placement data in VM placement file for VM $vmId" } - continue - } - val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false - if (machineInCluster) { - logger.info { "Ignored VM $vmId" } - continue - } - } - - submitted++ - delay(max(0, time - simulationContext.clock.millis())) - domain.launch { - chan.send(Unit) - val server = scheduler.deploy( - workload.image.name, workload.image, - Flavor(workload.image.maxCores, workload.image.requiredMemory) - ) - // Monitor server events - server.events - .onEach { - if (it is ServerEvent.StateChanged) { - reporter.reportVmStateChange(it.server) - } +sealed class Reporter(name: String) : OptionGroup(name) { + /** + * Create the [Sc20Reporter] for this option. + */ + abstract fun createReporter(): Sc20Reporter +} - delay(1) - finished.send(Unit) - } - .collect() - } - } +class Parquet : Reporter("Options for reporting using Parquet") { + private val path by option(help = "path to where the output should be stored") + .file() + .defaultLazy { File("data/results-${System.currentTimeMillis()}.parquet") } - while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) { - finished.receive() - } - } finally { - reader.close() - } + override fun createReporter(): Sc20Reporter = Sc20ParquetReporter(path) } /** * Main entry point of the experiment. */ -@OptIn(ExperimentalCoroutinesApi::class) -fun main(args: Array) { - val cli = ArgParser(args).parseInto(::ExperimentParameters) - logger.info("trace-directory: ${cli.traceDirectory}") - logger.info("environment-file: ${cli.environmentFile}") - logger.info("performance-interference-file: ${cli.performanceInterferenceFile}") - logger.info("selected-vms-file: ${cli.selectedVmsFile}") - logger.info("seed: ${cli.seed}") - logger.info("failures: ${cli.failures}") - logger.info("allocation-policy: ${cli.allocationPolicy}") - - val start = System.currentTimeMillis() - val reporter: Sc20Reporter = Sc20ParquetReporter(cli.outputFile) - - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") - - val chan = Channel(Channel.CONFLATED) - - val performanceInterferenceModel = try { - val performanceInterferenceStream = if (cli.performanceInterferenceFile != null) { - File(cli.performanceInterferenceFile!!).inputStream().buffered() - } else { - object {}.javaClass.getResourceAsStream("/env/performance-interference.json") - } - Sc20PerformanceInterferenceReader(performanceInterferenceStream) - .construct() - } catch (e: Throwable) { - reporter.close() - throw e - } - val vmPlacements = if (cli.vmPlacementFile == null) { - emptyMap() - } else { - Sc20VmPlacementReader(File(cli.vmPlacementFile!!).inputStream().buffered()).construct() - } - val environmentReader = Sc20ClusterEnvironmentReader(File(cli.environmentFile)) - val traceReader = try { - createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed) - } catch (e: Throwable) { - reporter.close() - throw e - } - val allocationPolicy = when (cli.allocationPolicy) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(cli.seed)) - "replay" -> ReplayAllocationPolicy(vmPlacements) - else -> throw IllegalArgumentException("Unknown allocation policy: ${cli.allocationPolicy}") - } - - root.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) - - val failureDomain = if (cli.failures) { - logger.info("ENABLING failures") - createFailureDomain(cli.seed, cli.failureInterval, bareMetalProvisioner, chan) - } else { - null - } - - attachMonitor(scheduler, reporter) - processTrace(traceReader, scheduler, chan, reporter, vmPlacements) - - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - logger.info("Simulation took ${System.currentTimeMillis() - start} milliseconds") - } - - runBlocking { - system.run() - system.terminate() - } - - // Explicitly close the monitor to flush its buffer - reporter.close() -} +fun main(args: Array) = ExperimentCommand().main(args) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt index fc3e98ae..eaac912a 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt @@ -12,14 +12,13 @@ import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.File import java.util.concurrent.ArrayBlockingQueue import kotlin.concurrent.thread private val logger = KotlinLogging.logger {} -class Sc20ParquetReporter( - destination: String -) : Sc20Reporter { +class Sc20ParquetReporter(destination: File) : Sc20Reporter { private val lastServerStates = mutableMapOf>() private val schema = SchemaBuilder .record("slice") @@ -43,7 +42,7 @@ class Sc20ParquetReporter( .name("totalRunningVms").type().longType().noDefault() .name("totalFinishedVms").type().longType().noDefault() .endRecord() - private val writer = AvroParquetWriter.builder(Path(destination)) + private val writer = AvroParquetWriter.builder(Path(destination.absolutePath)) .withSchema(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression -- cgit v1.2.3