diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-05-08 10:23:10 +0200 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-05-08 10:23:10 +0200 |
| commit | db934d9cebe1d48e148e54aca507e2c44a9bc946 (patch) | |
| tree | cffff3e75926084edd742e28d2cb4c84d8372604 | |
| parent | e2e7e1abaf70d7e49e2e4af04648796f01ba6492 (diff) | |
| parent | cbdbf818004040b60aa122dc6cb98ef635fa5ac1 (diff) | |
Merge branch 'feat/database-metrics' into '2.x'
Add database integration for experimental results
Closes #57
See merge request opendc/opendc-simulator!65
12 files changed, 731 insertions, 348 deletions
diff --git a/opendc/opendc-compute/build.gradle.kts b/opendc/opendc-compute/build.gradle.kts index 09c904f2..7d43b064 100644 --- a/opendc/opendc-compute/build.gradle.kts +++ b/opendc/opendc-compute/build.gradle.kts @@ -33,6 +33,7 @@ dependencies { implementation(kotlin("stdlib")) api(project(":odcsim:odcsim-api")) api(project(":opendc:opendc-core")) + implementation("io.github.microutils:kotlin-logging:1.7.9") testRuntimeOnly(project(":odcsim:odcsim-engine-omega")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 2185b372..3603ae69 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -24,10 +24,13 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext +import mu.KotlinLogging import kotlin.coroutines.Continuation import kotlin.coroutines.resume import kotlin.math.max +private val logger = KotlinLogging.logger {} + @OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( public override val allocationPolicy: AllocationPolicy, @@ -141,7 +144,7 @@ class SimpleVirtProvisioningService( unscheduledVms++ incomingImages -= imageInstance - println("[${clock.millis()}] CANNOT SPAWN ${imageInstance.image}") + logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]") continue } else { break @@ -149,7 +152,7 @@ class SimpleVirtProvisioningService( } try { - println("[${clock.millis()}] SPAWN ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}") + logger.info { "Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } incomingImages -= imageInstance // Speculatively update the hypervisor view information to prevent other images in the queue from @@ -174,7 +177,7 @@ class SimpleVirtProvisioningService( when (event) { is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { - println("[${clock.millis()}] FINISH ${event.server.uid} ${event.server.name} ${event.server.flavor}") + logger.info { "Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } runningVms-- finishedVms++ @@ -191,13 +194,13 @@ class SimpleVirtProvisioningService( } .launchIn(this) } catch (e: InsufficientMemoryOnServerException) { - println("Unable to deploy image due to insufficient memory") + logger.error("Failed to deploy VM", e) selectedHv.numberOfActiveServers-- selectedHv.provisionedCores -= imageInstance.flavor.cpuCount selectedHv.availableMemory += requiredMemory } catch (e: Throwable) { - e.printStackTrace() + logger.error("Failed to deploy VM", e) } } } diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index ccfa3038..6b6366a7 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -39,13 +39,15 @@ dependencies { api(project(":opendc:opendc-core")) implementation(project(":opendc:opendc-format")) implementation(kotlin("stdlib")) - implementation("com.xenomachina:kotlin-argparser:2.0.7") - api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") + implementation("com.github.ajalt:clikt:2.6.0") + implementation("io.github.microutils:kotlin-logging:1.7.9") implementation("org.apache.parquet:parquet-avro:1.11.0") implementation("org.apache.hadoop:hadoop-client:3.2.1") { exclude(group = "org.slf4j", module = "slf4j-log4j12") + exclude(group = "log4j") } - runtimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}") + runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") + runtimeOnly("org.postgresql:postgresql:42.2.12") runtimeOnly(project(":odcsim:odcsim-engine-omega")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") diff --git a/opendc/opendc-experiments-sc20/schema.sql b/opendc/opendc-experiments-sc20/schema.sql new file mode 100644 index 00000000..51990a75 --- /dev/null +++ b/opendc/opendc-experiments-sc20/schema.sql @@ -0,0 +1,22 @@ +DROP TABLE IF EXISTS host_reports; +CREATE TABLE host_reports ( + id BIGSERIAL PRIMARY KEY NOT NULL, + experiment_id BIGINT NOT NULL, + time BIGINT NOT NULL, + duration BIGINT NOT NULL, + requested_burst BIGINT NOT NULL, + granted_burst BIGINT NOT NULL, + overcommissioned_burst BIGINT NOT NULL, + interfered_burst BIGINT NOT NULL, + cpu_usage DOUBLE PRECISION NOT NULL, + cpu_demand DOUBLE PRECISION NOT NULL, + image_count INTEGER NOT NULL, + server TEXT NOT NULL, + host_state TEXT NOT NULL, + host_usage DOUBLE PRECISION NOT NULL, + power_draw DOUBLE PRECISION NOT NULL, + total_submitted_vms BIGINT NOT NULL, + total_queued_vms BIGINT NOT NULL, + total_running_vms BIGINT NOT NULL, + total_finished_vms BIGINT NOT NULL +); diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt new file mode 100644 index 00000000..e37dea8b --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt @@ -0,0 +1,235 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorEvent +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver +import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import com.atlarge.opendc.core.failure.CorrelatedFaultInjector +import com.atlarge.opendc.core.failure.FailureDomain +import com.atlarge.opendc.core.failure.FaultInjector +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import mu.KotlinLogging +import java.io.File +import java.util.TreeSet +import kotlin.math.ln +import kotlin.math.max +import kotlin.random.Random + +/** + * The logger for this experiment. + */ +private val logger = KotlinLogging.logger {} + +/** + * Construct the failure domain for the experiments. + */ +suspend fun createFailureDomain( + seed: Int, + failureInterval: Int, + bareMetalProvisioner: ProvisioningService, + chan: Channel<Unit> +): Domain { + val root = simulationContext.domain + val domain = root.newDomain(name = "failures") + domain.launch { + chan.receive() + val random = Random(seed) + val injectors = mutableMapOf<String, FaultInjector>() + for (node in bareMetalProvisioner.nodes()) { + val cluster = node.metadata[NODE_CLUSTER] as String + val injector = + injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) } + injector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + return domain +} + +/** + * Obtain the [FaultInjector] to use for the experiments. + */ +fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): FaultInjector { + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return CorrelatedFaultInjector( + domain, + iatScale = ln(failureInterval.toDouble()), iatShape = 1.03, // Hours + sizeScale = 1.88, sizeShape = 1.25, + dScale = 9.51, dShape = 3.21, // Minutes + random = random + ) +} + +/** + * Create the trace reader from which the VM workloads are read. + */ +fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader { + return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) +} + +/** + * Construct the environment for a VM provisioner and return the provisioner instance. + */ +suspend fun createProvisioner( + root: Domain, + environmentReader: EnvironmentReader, + allocationPolicy: AllocationPolicy +): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) { + val environment = environmentReader.use { it.construct(root) } + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] + + // Wait for the bare metal nodes to be spawned + delay(10) + + val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) + + // Wait for the hypervisors to be spawned + delay(10) + + bareMetalProvisioner to scheduler +} + +/** + * Attach the specified monitor to the VM provisioner. + */ +@OptIn(ExperimentalCoroutinesApi::class) +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) { + val domain = simulationContext.domain + val hypervisors = scheduler.drivers() + + // Monitor hypervisor events + for (hypervisor in hypervisors) { + // TODO Do not expose VirtDriver directly but use Hypervisor class. + reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + hypervisor.server.events + .onEach { event -> + when (event) { + is ServerEvent.StateChanged -> { + reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + } + } + } + .launchIn(domain) + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> reporter.reportHostSlice( + simulationContext.clock.millis(), + event.requestedBurst, + event.grantedBurst, + event.overcommissionedBurst, + event.interferedBurst, + event.cpuUsage, + event.cpuDemand, + event.numberOfDeployedImages, + event.hostServer, + scheduler.submittedVms, + scheduler.queuedVms, + scheduler.runningVms, + scheduler.finishedVms + ) + } + } + .launchIn(domain) + } +} + +/** + * Process the trace. + */ +suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, reporter: Sc20Reporter, vmPlacements: Map<String, String> = emptyMap()) { + val domain = simulationContext.domain + + try { + var submitted = 0L + val finished = Channel<Unit>(Channel.CONFLATED) + val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) + + while (reader.hasNext()) { + val (time, workload) = reader.next() + + if (vmPlacements.isNotEmpty()) { + val vmId = workload.name.replace("VM Workload ", "") + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null) { + logger.warn { "Could not find placement data in VM placement file for VM $vmId" } + continue + } + val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false + if (machineInCluster) { + logger.info { "Ignored VM $vmId" } + continue + } + } + + submitted++ + delay(max(0, time - simulationContext.clock.millis())) + domain.launch { + chan.send(Unit) + val server = scheduler.deploy( + workload.image.name, workload.image, + Flavor(workload.image.maxCores, workload.image.requiredMemory) + ) + // Monitor server events + server.events + .onEach { + if (it is ServerEvent.StateChanged) { + reporter.reportVmStateChange(it.server) + } + + delay(1) + finished.send(Unit) + } + .collect() + } + } + + while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) { + finished.receive() + } + } finally { + reader.close() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt index b1964197..51448c9e 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt @@ -24,82 +24,101 @@ package com.atlarge.opendc.experiments.sc20 -import com.atlarge.odcsim.Domain import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.ServerEvent -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.compute.metal.NODE_CLUSTER -import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.compute.virt.HypervisorEvent -import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver -import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService -import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy -import com.atlarge.opendc.core.failure.CorrelatedFaultInjector -import com.atlarge.opendc.core.failure.FailureDomain -import com.atlarge.opendc.core.failure.FaultInjector -import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import com.atlarge.opendc.format.trace.TraceReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import com.xenomachina.argparser.ArgParser -import com.xenomachina.argparser.default -import kotlinx.coroutines.ExperimentalCoroutinesApi +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.groups.OptionGroup +import com.github.ajalt.clikt.parameters.groups.default +import com.github.ajalt.clikt.parameters.groups.groupChoice +import com.github.ajalt.clikt.parameters.groups.mutuallyExclusiveOptions +import com.github.ajalt.clikt.parameters.groups.required +import com.github.ajalt.clikt.parameters.options.convert +import com.github.ajalt.clikt.parameters.options.default +import com.github.ajalt.clikt.parameters.options.defaultLazy +import com.github.ajalt.clikt.parameters.options.flag +import com.github.ajalt.clikt.parameters.options.option +import com.github.ajalt.clikt.parameters.options.required +import com.github.ajalt.clikt.parameters.types.choice +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.int +import com.github.ajalt.clikt.parameters.types.long import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext +import mu.KotlinLogging import java.io.File import java.io.FileReader +import java.io.InputStream +import java.sql.DriverManager import java.util.ServiceLoader -import java.util.TreeSet -import kotlin.math.ln -import kotlin.math.max import kotlin.random.Random -class ExperimentParameters(parser: ArgParser) { - val traceDirectory by parser.storing("path to the trace directory") - val environmentFile by parser.storing("path to the environment file") - val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null } - val vmPlacementFile by parser.storing("path to the VM placement file").default { null } - val outputFile by parser.storing("path to where the output should be stored") - .default { "data/results-${System.currentTimeMillis()}.parquet" } - val selectedVms by parser.storing("the VMs to run") { parseVMs(this) } - .default { emptyList() } - val selectedVmsFile by parser.storing("path to a file containing the VMs to run") { - parseVMs(FileReader(File(this)).readText()) - } - .default { emptyList() } - val seed by parser.storing("the random seed") { toInt() } - .default(0) - val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures") - val failureInterval by parser.storing("expected number of hours between failures") { toInt() } - .default(24 * 7) // one week - val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem") +/** + * The logger for this experiment. + */ +private val logger = KotlinLogging.logger {} - fun getSelectedVmList(): List<String> { - return if (selectedVms.isEmpty()) { - selectedVmsFile - } else { - selectedVms +/** + * Represents the command for running the experiment. + */ +class ExperimentCommand : CliktCommand(name = "sc20-experiment") { + private val environment by option("--environment-file", help = "path to the environment file") + .file() + .required() + private val performanceInterferenceStream by option("--performance-interference-file", help = "path to the performance interference file") + .file() + .convert { it.inputStream() as InputStream } + .defaultLazy { ExperimentCommand::class.java.getResourceAsStream("/env/performance-interference.json") } + + private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file") + .file() + .convert { + Sc20VmPlacementReader(it.inputStream().buffered()).construct() } - } + .default(emptyMap()) + + private val selectedVms by mutuallyExclusiveOptions( + option("--selected-vms", help = "the VMs to run").convert { parseVMs(it) }, + option("--selected-vms-file").file().convert { parseVMs(FileReader(it).readText()) } + ).default(emptyList()) + + private val seed by option(help = "the random seed") + .int() + .default(0) + private val failures by option("-x", "--failures", help = "enable (correlated) machine failures") + .flag() + private val failureInterval by option(help = "expected number of hours between failures") + .int() + .default(24 * 7) // one week + private val allocationPolicy by option(help = "name of VM allocation policy to use") + .choice( + "mem", "mem-inv", + "core-mem", "core-mem-inv", + "active-servers", "active-servers-inv", + "provisioned-cores", "provisioned-cores-inv", + "random", "replay" + ) + .default("core-mem") + + private val trace by option("--trace-directory", help = "path to the trace directory") + .file(canBeFile = false) + .required() + + private val reporter by option().groupChoice( + "parquet" to Parquet(), + "postgres" to Postgres() + ).required() private fun parseVMs(string: String): List<String> { // Handle case where VM list contains a VM name with an (escaped) single-quote in it @@ -109,267 +128,108 @@ class ExperimentParameters(parser: ArgParser) { val vms: List<String> = jacksonObjectMapper().readValue(sanitizedString) return vms } -} -/** - * Construct the failure domain for the experiments. - */ -suspend fun createFailureDomain( - seed: Int, - failureInterval: Int, - bareMetalProvisioner: ProvisioningService, - chan: Channel<Unit> -): Domain { - val root = simulationContext.domain - val domain = root.newDomain(name = "failures") - domain.launch { - chan.receive() - val random = Random(seed) - val injectors = mutableMapOf<String, FaultInjector>() - for (node in bareMetalProvisioner.nodes()) { - val cluster = node.metadata[NODE_CLUSTER] as String - val injector = - injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) } - injector.enqueue(node.metadata["driver"] as FailureDomain) + override fun run() { + logger.info("seed: $seed") + logger.info("failures: $failures") + logger.info("allocation-policy: $allocationPolicy") + + val start = System.currentTimeMillis() + val reporter: Sc20Reporter = reporter.createReporter() + + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + val system = provider("test") + val root = system.newDomain("root") + + val chan = Channel<Unit>(Channel.CONFLATED) + val allocationPolicy = when (this.allocationPolicy) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seed)) + "replay" -> ReplayAllocationPolicy(vmPlacements) + else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}") } - } - return domain -} -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): FaultInjector { - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return CorrelatedFaultInjector( - domain, - iatScale = ln(failureInterval.toDouble()), iatShape = 1.03, // Hours - sizeScale = 1.88, sizeShape = 1.25, - dScale = 9.51, dShape = 3.21, // Minutes - random = random - ) -} - -/** - * Create the trace reader from which the VM workloads are read. - */ -fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader { - return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) -} + val performanceInterferenceModel = try { + Sc20PerformanceInterferenceReader(performanceInterferenceStream).construct() + } catch (e: Throwable) { + reporter.close() + throw e + } + val environmentReader = Sc20ClusterEnvironmentReader(environment) + val traceReader = try { + createTraceReader(trace, performanceInterferenceModel, selectedVms, seed) + } catch (e: Throwable) { + reporter.close() + throw e + } -/** - * Construct the environment for a VM provisioner and return the provisioner instance. - */ -suspend fun createProvisioner( - root: Domain, - environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy -): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) { - val environment = environmentReader.use { it.construct(root) } - val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) - // Wait for the bare metal nodes to be spawned - delay(10) + val failureDomain = if (failures) { + logger.info("ENABLING failures") + createFailureDomain(seed, failureInterval, bareMetalProvisioner, chan) + } else { + null + } - val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) + attachMonitor(scheduler, reporter) + processTrace(traceReader, scheduler, chan, reporter, vmPlacements) - // Wait for the hypervisors to be spawned - delay(10) + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") - bareMetalProvisioner to scheduler -} + failureDomain?.cancel() + scheduler.terminate() + logger.info("Simulation took ${System.currentTimeMillis() - start} milliseconds") + } -/** - * Attach the specified monitor to the VM provisioner. - */ -@OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc20Monitor) { - val domain = simulationContext.domain - val hypervisors = scheduler.drivers() + runBlocking { + system.run() + system.terminate() + } - // Monitor hypervisor events - for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - hypervisor.server.events - .onEach { event -> - when (event) { - is ServerEvent.StateChanged -> { - monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - } - } - } - .launchIn(domain) - hypervisor.events - .onEach { event -> - when (event) { - is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( - simulationContext.clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.hostServer, - scheduler.submittedVms, - scheduler.queuedVms, - scheduler.runningVms, - scheduler.finishedVms - ) - } - } - .launchIn(domain) + // Explicitly close the monitor to flush its buffer + reporter.close() } } -/** - * Process the trace. - */ -suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: Sc20Monitor, vmPlacements: Map<String, String> = emptyMap()) { - val domain = simulationContext.domain - - try { - var submitted = 0L - val finished = Channel<Unit>(Channel.CONFLATED) - val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) - - while (reader.hasNext()) { - val (time, workload) = reader.next() +sealed class Reporter(name: String) : OptionGroup(name) { + /** + * Create the [Sc20Reporter] for this option. + */ + abstract fun createReporter(): Sc20Reporter +} - if (vmPlacements.isNotEmpty()) { - val vmId = workload.name.replace("VM Workload ", "") - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null) { - println("Could not find placement data in VM placement file for VM $vmId") - continue - } - val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false - if (machineInCluster) { - println("Ignored VM") - continue - } - } +class Parquet : Reporter("Options for reporting using Parquet") { + private val path by option(help = "path to where the output should be stored") + .file() + .defaultLazy { File("data/results-${System.currentTimeMillis()}.parquet") } - submitted++ - delay(max(0, time - simulationContext.clock.millis())) - domain.launch { - chan.send(Unit) - val server = scheduler.deploy( - workload.image.name, workload.image, - Flavor(workload.image.maxCores, workload.image.requiredMemory) - ) - // Monitor server events - server.events - .onEach { - if (it is ServerEvent.StateChanged) { - monitor.onVmStateChanged(it.server) - } + override fun createReporter(): Sc20Reporter = Sc20ParquetReporter(path) +} - delay(1) - finished.send(Unit) - } - .collect() - } - } +class Postgres : Reporter("Options for reporting using PostgreSQL") { + private val url by option(help = "JDBC connection url").required() + private val experimentId by option(help = "Experiment ID").long().required() - while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) { - finished.receive() - } - } finally { - reader.close() + override fun createReporter(): Sc20Reporter { + val conn = DriverManager.getConnection(url) + return Sc20PostgresReporter(conn, experimentId) } } /** * Main entry point of the experiment. */ -@OptIn(ExperimentalCoroutinesApi::class) -fun main(args: Array<String>) { - val cli = ArgParser(args).parseInto(::ExperimentParameters) - println("trace-directory: ${cli.traceDirectory}") - println("environment-file: ${cli.environmentFile}") - println("performance-interference-file: ${cli.performanceInterferenceFile}") - println("selected-vms-file: ${cli.selectedVmsFile}") - println("seed: ${cli.seed}") - println("failures: ${cli.failures}") - println("allocation-policy: ${cli.allocationPolicy}") - - val start = System.currentTimeMillis() - val monitor: Sc20Monitor = Sc20ParquetMonitor(cli.outputFile) - - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") - - val chan = Channel<Unit>(Channel.CONFLATED) - - val performanceInterferenceModel = try { - val performanceInterferenceStream = if (cli.performanceInterferenceFile != null) { - File(cli.performanceInterferenceFile!!).inputStream().buffered() - } else { - object {}.javaClass.getResourceAsStream("/env/performance-interference.json") - } - Sc20PerformanceInterferenceReader(performanceInterferenceStream) - .construct() - } catch (e: Throwable) { - monitor.close() - throw e - } - val vmPlacements = if (cli.vmPlacementFile == null) { - emptyMap() - } else { - Sc20VmPlacementReader(File(cli.vmPlacementFile!!).inputStream().buffered()).construct() - } - val environmentReader = Sc20ClusterEnvironmentReader(File(cli.environmentFile)) - val traceReader = try { - createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed) - } catch (e: Throwable) { - monitor.close() - throw e - } - val allocationPolicy = when (cli.allocationPolicy) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(cli.seed)) - "replay" -> ReplayAllocationPolicy(vmPlacements) - else -> throw IllegalArgumentException("Unknown allocation policy: ${cli.allocationPolicy}") - } - - root.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) - - val failureDomain = if (cli.failures) { - println("ENABLING failures") - createFailureDomain(cli.seed, cli.failureInterval, bareMetalProvisioner, chan) - } else { - null - } - - attachMonitor(scheduler, monitor) - processTrace(traceReader, scheduler, chan, monitor, vmPlacements) - - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") - } - - runBlocking { - system.run() - system.terminate() - } - - // Explicitly close the monitor to flush its buffer - monitor.close() -} +fun main(args: Array<String>) = ExperimentCommand().main(args) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt index 5e554196..f2139144 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt @@ -6,17 +6,19 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import kotlinx.coroutines.flow.first +import mu.KotlinLogging import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.File import java.util.concurrent.ArrayBlockingQueue import kotlin.concurrent.thread -class Sc20ParquetMonitor( - destination: String -) : Sc20Monitor { +private val logger = KotlinLogging.logger {} + +class Sc20ParquetReporter(destination: File) : Sc20Reporter { private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() private val schema = SchemaBuilder .record("slice") @@ -24,23 +26,23 @@ class Sc20ParquetMonitor( .fields() .name("time").type().longType().noDefault() .name("duration").type().longType().noDefault() - .name("requestedBurst").type().longType().noDefault() - .name("grantedBurst").type().longType().noDefault() - .name("overcommissionedBurst").type().longType().noDefault() - .name("interferedBurst").type().longType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("cpuDemand").type().doubleType().noDefault() - .name("numberOfDeployedImages").type().intType().noDefault() + .name("requested_burst").type().longType().noDefault() + .name("granted_burst").type().longType().noDefault() + .name("overcommissioned_burst").type().longType().noDefault() + .name("interfered_burst").type().longType().noDefault() + .name("cpu_usage").type().doubleType().noDefault() + .name("cpu_demand").type().doubleType().noDefault() + .name("image_count").type().intType().noDefault() .name("server").type().stringType().noDefault() - .name("hostState").type().stringType().noDefault() - .name("hostUsage").type().doubleType().noDefault() - .name("powerDraw").type().doubleType().noDefault() - .name("totalSubmittedVms").type().longType().noDefault() - .name("totalQueuedVms").type().longType().noDefault() - .name("totalRunningVms").type().longType().noDefault() - .name("totalFinishedVms").type().longType().noDefault() + .name("host_state").type().stringType().noDefault() + .name("host_usage").type().doubleType().noDefault() + .name("power_draw").type().doubleType().noDefault() + .name("total_submitted_vms").type().longType().noDefault() + .name("total_queued_vms").type().longType().noDefault() + .name("total_running_vms").type().longType().noDefault() + .name("total_finished_vms").type().longType().noDefault() .endRecord() - private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination)) + private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination.absolutePath)) .withSchema(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression @@ -60,9 +62,9 @@ class Sc20ParquetMonitor( } } - override suspend fun onVmStateChanged(server: Server) {} + override suspend fun reportVmStateChange(server: Server) {} - override suspend fun serverStateChanged( + override suspend fun reportHostStateChange( driver: VirtDriver, server: Server, submittedVms: Long, @@ -73,7 +75,7 @@ class Sc20ParquetMonitor( val lastServerState = lastServerStates[server] if (server.state == ServerState.SHUTOFF && lastServerState != null) { val duration = simulationContext.clock.millis() - lastServerState.second - onSliceFinish( + reportHostSlice( simulationContext.clock.millis(), 0, 0, @@ -91,12 +93,12 @@ class Sc20ParquetMonitor( ) } - println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}") + logger.info("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]") lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) } - override suspend fun onSliceFinish( + override suspend fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, @@ -120,21 +122,21 @@ class Sc20ParquetMonitor( val record = GenericData.Record(schema) record.put("time", time) record.put("duration", duration) - record.put("requestedBurst", requestedBurst) - record.put("grantedBurst", grantedBurst) - record.put("overcommissionedBurst", overcommissionedBurst) - record.put("interferedBurst", interferedBurst) - record.put("cpuUsage", cpuUsage) - record.put("cpuDemand", cpuDemand) - record.put("numberOfDeployedImages", numberOfDeployedImages) + record.put("requested_burst", requestedBurst) + record.put("granted_burst", grantedBurst) + record.put("overcommissioned_burst", overcommissionedBurst) + record.put("interfered_burst", interferedBurst) + record.put("cpu_usage", cpuUsage) + record.put("cpu_demand", cpuDemand) + record.put("image_count", numberOfDeployedImages) record.put("server", hostServer.uid) - record.put("hostState", hostServer.state) - record.put("hostUsage", usage) - record.put("powerDraw", powerDraw) - record.put("totalSubmittedVms", submittedVms) - record.put("totalQueuedVms", queuedVms) - record.put("totalRunningVms", runningVms) - record.put("totalFinishedVms", finishedVms) + record.put("host_state", hostServer.state) + record.put("host_usage", usage) + record.put("power_draw", powerDraw) + record.put("total_submitted_vms", submittedVms) + record.put("total_queued_vms", queuedVms) + record.put("total_running_vms", runningVms) + record.put("total_finished_vms", finishedVms) queue.put(record) } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt index 0a7718e9..8ae1693c 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt @@ -32,6 +32,7 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader +import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader @@ -49,6 +50,8 @@ import java.util.concurrent.ArrayBlockingQueue import kotlin.concurrent.thread import kotlin.random.Random +private val logger = KotlinLogging.logger {} + /** * A [TraceReader] for the internal VM workload trace format. * @@ -190,7 +193,7 @@ class Sc20ParquetTraceReader( assert(uid !in takenIds) takenIds += uid - println(id) + logger.info("Processing VM $id") val internalBuffer = mutableListOf<FlopsHistoryFragment>() val externalBuffer = mutableListOf<FlopsHistoryFragment>() diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt new file mode 100644 index 00000000..5c5e6ceb --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt @@ -0,0 +1,200 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import kotlinx.coroutines.flow.first +import mu.KotlinLogging +import java.sql.Connection +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.concurrent.thread + +private val logger = KotlinLogging.logger {} + +class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20Reporter { + private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() + private val queue = ArrayBlockingQueue<Report>(2048) + private val stop = AtomicBoolean(false) + private val writerThread = thread(start = true, name = "sc20-writer") { + val stmt = try { + conn.prepareStatement( + """ + INSERT INTO host_reports (experiment_id, time, duration, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, image_count, server, host_state, host_usage, power_draw, total_submitted_vms, total_queued_vms, total_running_vms, total_finished_vms) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """.trimIndent() + ) + } catch (e: Throwable) { + conn.close() + throw e + } + + val batchSize = 4096 + var batch = 0 + + try { + while (!stop.get()) { + val record = queue.take() + stmt.setLong(1, experimentId) + stmt.setLong(2, record.time) + stmt.setLong(3, record.duration) + stmt.setLong(4, record.requestedBurst) + stmt.setLong(5, record.grantedBurst) + stmt.setLong(6, record.overcommissionedBurst) + stmt.setLong(7, record.interferedBurst) + stmt.setDouble(8, record.cpuUsage) + stmt.setDouble(9, record.cpuDemand) + stmt.setInt(10, record.numberOfDeployedImages) + stmt.setString(11, record.hostServer.uid.toString()) + stmt.setString(12, record.hostServer.state.name) + stmt.setDouble(13, record.hostUsage) + stmt.setDouble(14, record.powerDraw) + stmt.setLong(15, record.submittedVms) + stmt.setLong(16, record.queuedVms) + stmt.setLong(17, record.runningVms) + stmt.setLong(18, record.finishedVms) + stmt.addBatch() + batch++ + + if (batch > batchSize) { + stmt.executeBatch() + batch = 0 + } + } + } finally { + stmt.executeBatch() + stmt.close() + conn.close() + } + } + + override suspend fun reportVmStateChange(server: Server) {} + + override suspend fun reportHostStateChange( + driver: VirtDriver, + server: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long + ) { + val lastServerState = lastServerStates[server] + if (server.state == ServerState.SHUTOFF && lastServerState != null) { + val duration = simulationContext.clock.millis() - lastServerState.second + reportHostSlice( + simulationContext.clock.millis(), + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server, + submittedVms, + queuedVms, + runningVms, + finishedVms, + duration + ) + } + + logger.info("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]") + + lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) + } + + override suspend fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, + duration: Long + ) { + // Assume for now that the host is not virtualized and measure the current power draw + val driver = hostServer.services[BareMetalDriver.Key] + val usage = driver.usage.first() + val powerDraw = driver.powerDraw.first() + + queue.put( + Report( + time, + duration, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + numberOfDeployedImages, + hostServer, + usage, + powerDraw, + submittedVms, + queuedVms, + runningVms, + finishedVms + ) + ) + } + + override fun close() { + // Busy loop to wait for writer thread to finish + stop.set(true) + writerThread.join() + } + + data class Report( + val time: Long, + val duration: Long, + val requestedBurst: Long, + val grantedBurst: Long, + val overcommissionedBurst: Long, + val interferedBurst: Long, + val cpuUsage: Double, + val cpuDemand: Double, + val numberOfDeployedImages: Int, + val hostServer: Server, + val hostUsage: Double, + val powerDraw: Double, + val submittedVms: Long, + val queuedVms: Long, + val runningVms: Long, + val finishedVms: Long + ) +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt index 4b8b80a8..84500417 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt @@ -28,10 +28,16 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.virt.driver.VirtDriver import java.io.Closeable -interface Sc20Monitor : Closeable { - suspend fun onVmStateChanged(server: Server) {} +interface Sc20Reporter : Closeable { + /** + * This method is invoked when the state of a VM changes. + */ + suspend fun reportVmStateChange(server: Server) {} - suspend fun serverStateChanged( + /** + * This method is invoked when the state of a host changes. + */ + suspend fun reportHostStateChange( driver: VirtDriver, server: Server, submittedVms: Long, @@ -40,7 +46,10 @@ interface Sc20Monitor : Closeable { finishedVms: Long ) {} - suspend fun onSliceFinish( + /** + * This method is invoked for a host for each slice that is finishes. + */ + suspend fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, diff --git a/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml b/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml new file mode 100644 index 00000000..77a15e55 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ MIT License + ~ + ~ Copyright (c) 2020 atlarge-research + ~ + ~ Permission is hereby granted, free of charge, to any person obtaining a copy + ~ of this software and associated documentation files (the "Software"), to deal + ~ in the Software without restriction, including without limitation the rights + ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + ~ copies of the Software, and to permit persons to whom the Software is + ~ furnished to do so, subject to the following conditions: + ~ + ~ The above copyright notice and this permission notice shall be included in all + ~ copies or substantial portions of the Software. + ~ + ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + ~ SOFTWARE. + --> + +<Configuration status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false" /> + </Console> + </Appenders> + <Loggers> + <Logger name="com.atlarge.odcsim" level="info" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Logger name="com.atlarge.opendc" level="info" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Logger name="org.apache.hadoop" level="warn" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Root level="error"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 2bf6bcf3..239d018a 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -63,7 +63,7 @@ class Sc20IntegrationTest { /** * The monitor used to keep track of the metrics. */ - private lateinit var monitor: TestSc20Monitor + private lateinit var monitor: TestSc20Reporter /** * Setup the experimental environment. @@ -73,7 +73,7 @@ class Sc20IntegrationTest { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() simulationEngine = provider("test") root = simulationEngine.newDomain("root") - monitor = TestSc20Monitor() + monitor = TestSc20Reporter() } /** @@ -151,13 +151,13 @@ class Sc20IntegrationTest { return Sc20ClusterEnvironmentReader(stream) } - class TestSc20Monitor : Sc20Monitor { + class TestSc20Reporter : Sc20Reporter { var totalRequestedBurst = 0L var totalGrantedBurst = 0L var totalOvercommissionedBurst = 0L var totalInterferedBurst = 0L - override suspend fun onSliceFinish( + override suspend fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, |
