From 48f6a6f2d42851bc2eeed5b6ef41145740c70286 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 5 May 2020 22:35:24 +0200 Subject: test: Add initial integration test for SC20 experiments --- opendc/opendc-experiments-sc20/build.gradle.kts | 2 +- .../opendc/experiments/sc20/Sc20Experiment.kt | 366 +++++++++++++++++++++ .../opendc/experiments/sc20/TestExperiment.kt | 366 --------------------- .../opendc/experiments/sc20/Sc20IntegrationTest.kt | 183 +++++++++++ .../src/test/resources/env/topology.txt | 5 + .../src/test/resources/trace/meta.parquet | Bin 0 -> 2148 bytes .../src/test/resources/trace/trace.parquet | Bin 0 -> 1672463 bytes .../sc20/Sc20ClusterEnvironmentReader.kt | 11 +- 8 files changed, 562 insertions(+), 371 deletions(-) create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt create mode 100644 opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt create mode 100644 opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt create mode 100644 opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet create mode 100644 opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index 8611ffa7..ccfa3038 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -31,7 +31,7 @@ plugins { } application { - mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperimentKt" + mainClassName = "com.atlarge.opendc.experiments.sc20.Sc20ExperimentKt" applicationDefaultJvmArgs = listOf("-Xmx2500M", "-Xms1800M") } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt new file mode 100644 index 00000000..fc4b9058 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt @@ -0,0 +1,366 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorEvent +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver +import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy +import com.atlarge.opendc.core.failure.CorrelatedFaultInjector +import com.atlarge.opendc.core.failure.FailureDomain +import com.atlarge.opendc.core.failure.FaultInjector +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader +import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import com.xenomachina.argparser.ArgParser +import com.xenomachina.argparser.default +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import java.io.File +import java.io.FileReader +import java.lang.IllegalArgumentException +import java.util.ServiceLoader +import java.util.TreeSet +import kotlin.math.max +import kotlin.random.Random + +class ExperimentParameters(parser: ArgParser) { + val traceDirectory by parser.storing("path to the trace directory") + val environmentFile by parser.storing("path to the environment file") + val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null } + val vmPlacementFile by parser.storing("path to the VM placement file").default { null } + val outputFile by parser.storing("path to where the output should be stored") + .default { "data/results-${System.currentTimeMillis()}.parquet" } + val selectedVms by parser.storing("the VMs to run") { parseVMs(this) } + .default { emptyList() } + val selectedVmsFile by parser.storing("path to a file containing the VMs to run") { + parseVMs(FileReader(File(this)).readText()) + } + .default { emptyList() } + val seed by parser.storing("the random seed") { toInt() } + .default(0) + val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures") + val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem") + + fun getSelectedVmList(): List { + return if (selectedVms.isEmpty()) { + selectedVmsFile + } else { + selectedVms + } + } + + private fun parseVMs(string: String): List { + // Handle case where VM list contains a VM name with an (escaped) single-quote in it + val sanitizedString = string.replace("\\'", "\\\\[") + .replace("'", "\"") + .replace("\\\\[", "'") + val vms: List = jacksonObjectMapper().readValue(sanitizedString) + return vms + } +} + +/** + * Construct the failure domain for the experiments. + */ +suspend fun createFailureDomain(seed: Int, bareMetalProvisioner: ProvisioningService, chan: Channel): Domain { + val root = simulationContext.domain + val domain = root.newDomain(name = "failures") + domain.launch { + chan.receive() + val random = Random(seed) + val injectors = mutableMapOf() + for (node in bareMetalProvisioner.nodes()) { + val cluster = node.metadata[NODE_CLUSTER] as String + val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) } + injector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + return domain +} + +/** + * Obtain the [FaultInjector] to use for the experiments. + */ +fun createFaultInjector(domain: Domain, random: Random): FaultInjector { + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return CorrelatedFaultInjector(domain, + iatScale = -1.39, iatShape = 1.03, // Hours + sizeScale = 1.88, sizeShape = 1.25, + dScale = 9.51, dShape = 3.21, // Minutes + random = random + ) +} + +/** + * Create the trace reader from which the VM workloads are read. + */ +fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List, seed: Int): Sc20ParquetTraceReader { + return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) +} + +/** + * Construct the environment for a VM provisioner and return the provisioner instance. + */ +suspend fun createProvisioner( + root: Domain, + environmentReader: EnvironmentReader, + allocationPolicy: AllocationPolicy +): Pair = withContext(root.coroutineContext) { + val environment = environmentReader.use { it.construct(root) } + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] + + // Wait for the bare metal nodes to be spawned + delay(10) + + val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) + + // Wait for the hypervisors to be spawned + delay(10) + + bareMetalProvisioner to scheduler +} + +/** + * Attach the specified monitor to the VM provisioner. + */ +@OptIn(ExperimentalCoroutinesApi::class) +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc20Monitor) { + val domain = simulationContext.domain + val hypervisors = scheduler.drivers() + + // Monitor hypervisor events + for (hypervisor in hypervisors) { + // TODO Do not expose VirtDriver directly but use Hypervisor class. + monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + hypervisor.server.events + .onEach { event -> + when (event) { + is ServerEvent.StateChanged -> { + monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + } + } + } + .launchIn(domain) + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( + simulationContext.clock.millis(), + event.requestedBurst, + event.grantedBurst, + event.overcommissionedBurst, + event.interferedBurst, + event.cpuUsage, + event.cpuDemand, + event.numberOfDeployedImages, + event.hostServer, + scheduler.submittedVms, + scheduler.queuedVms, + scheduler.runningVms, + scheduler.finishedVms + ) + } + } + .launchIn(domain) + } +} + +/** + * Process the trace. + */ +suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, monitor: Sc20Monitor, vmPlacements: Map = emptyMap()) { + val domain = simulationContext.domain + + try { + var submitted = 0L + val finished = Channel(Channel.CONFLATED) + val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) + + while (reader.hasNext()) { + val (time, workload) = reader.next() + + if (vmPlacements.isNotEmpty()) { + val vmId = workload.name.replace("VM Workload ", "") + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null) { + println("Could not find placement data in VM placement file for VM $vmId") + continue + } + val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false + if (machineInCluster) { + println("Ignored VM") + continue + } + } + + submitted++ + delay(max(0, time - simulationContext.clock.millis())) + domain.launch { + chan.send(Unit) + val server = scheduler.deploy( + workload.image.name, workload.image, + Flavor(workload.image.maxCores, workload.image.requiredMemory) + ) + // Monitor server events + server.events + .onEach { + if (it is ServerEvent.StateChanged) { + monitor.onVmStateChanged(it.server) + } + + delay(1) + finished.send(Unit) + } + .collect() + } + } + + while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) { + finished.receive() + } + } finally { + reader.close() + } +} + +/** + * Main entry point of the experiment. + */ +@OptIn(ExperimentalCoroutinesApi::class) +fun main(args: Array) { + val cli = ArgParser(args).parseInto(::ExperimentParameters) + println("trace-directory: ${cli.traceDirectory}") + println("environment-file: ${cli.environmentFile}") + println("performance-interference-file: ${cli.performanceInterferenceFile}") + println("selected-vms-file: ${cli.selectedVmsFile}") + println("seed: ${cli.seed}") + println("failures: ${cli.failures}") + println("allocation-policy: ${cli.allocationPolicy}") + + val start = System.currentTimeMillis() + val monitor: Sc20Monitor = Sc20ParquetMonitor(cli.outputFile) + + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + val system = provider("test") + val root = system.newDomain("root") + + val chan = Channel(Channel.CONFLATED) + + val performanceInterferenceModel = try { + val performanceInterferenceStream = if (cli.performanceInterferenceFile != null) { + File(cli.performanceInterferenceFile!!).inputStream().buffered() + } else { + object {}.javaClass.getResourceAsStream("/env/performance-interference.json") + } + Sc20PerformanceInterferenceReader(performanceInterferenceStream) + .construct() + } catch (e: Throwable) { + monitor.close() + throw e + } + val vmPlacements = if (cli.vmPlacementFile == null) { + emptyMap() + } else { + Sc20VmPlacementReader(File(cli.vmPlacementFile!!).inputStream().buffered()).construct() + } + val environmentReader = Sc20ClusterEnvironmentReader(File(cli.environmentFile)) + val traceReader = try { + createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed) + } catch (e: Throwable) { + monitor.close() + throw e + } + val allocationPolicy = when (cli.allocationPolicy) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(cli.seed)) + "replay" -> ReplayAllocationPolicy(vmPlacements) + else -> throw IllegalArgumentException("Unknown allocation policy: ${cli.allocationPolicy}") + } + + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) + + val failureDomain = if (cli.failures) { + println("ENABLING failures") + createFailureDomain(cli.seed, bareMetalProvisioner, chan) + } else { + null + } + + attachMonitor(scheduler, monitor) + processTrace(traceReader, scheduler, chan, monitor, vmPlacements) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") + } + + runBlocking { + system.run() + system.terminate() + } + + // Explicitly close the monitor to flush its buffer + monitor.close() +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt deleted file mode 100644 index 6f1e9aae..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ /dev/null @@ -1,366 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.ServerEvent -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.compute.metal.NODE_CLUSTER -import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.compute.virt.HypervisorEvent -import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver -import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService -import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy -import com.atlarge.opendc.core.failure.CorrelatedFaultInjector -import com.atlarge.opendc.core.failure.FailureDomain -import com.atlarge.opendc.core.failure.FaultInjector -import com.atlarge.opendc.format.environment.EnvironmentReader -import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import com.atlarge.opendc.format.trace.TraceReader -import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader -import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import com.xenomachina.argparser.ArgParser -import com.xenomachina.argparser.default -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext -import java.io.File -import java.io.FileReader -import java.lang.IllegalArgumentException -import java.util.ServiceLoader -import java.util.TreeSet -import kotlin.math.max -import kotlin.random.Random - -class ExperimentParameters(parser: ArgParser) { - val traceDirectory by parser.storing("path to the trace directory") - val environmentFile by parser.storing("path to the environment file") - val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null } - val vmPlacementFile by parser.storing("path to the VM placement file").default { null } - val outputFile by parser.storing("path to where the output should be stored") - .default { "data/results-${System.currentTimeMillis()}.parquet" } - val selectedVms by parser.storing("the VMs to run") { parseVMs(this) } - .default { emptyList() } - val selectedVmsFile by parser.storing("path to a file containing the VMs to run") { - parseVMs(FileReader(File(this)).readText()) - } - .default { emptyList() } - val seed by parser.storing("the random seed") { toInt() } - .default(0) - val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures") - val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem") - - fun getSelectedVmList(): List { - return if (selectedVms.isEmpty()) { - selectedVmsFile - } else { - selectedVms - } - } - - private fun parseVMs(string: String): List { - // Handle case where VM list contains a VM name with an (escaped) single-quote in it - val sanitizedString = string.replace("\\'", "\\\\[") - .replace("'", "\"") - .replace("\\\\[", "'") - val vms: List = jacksonObjectMapper().readValue(sanitizedString) - return vms - } -} - -/** - * Construct the failure domain for the experiments. - */ -suspend fun createFailureDomain(seed: Int, bareMetalProvisioner: ProvisioningService, chan: Channel): Domain { - val root = simulationContext.domain - val domain = root.newDomain(name = "failures") - domain.launch { - chan.receive() - val random = Random(seed) - val injectors = mutableMapOf() - for (node in bareMetalProvisioner.nodes()) { - val cluster = node.metadata[NODE_CLUSTER] as String - val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) } - injector.enqueue(node.metadata["driver"] as FailureDomain) - } - } - return domain -} - -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -fun createFaultInjector(domain: Domain, random: Random): FaultInjector { - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return CorrelatedFaultInjector(domain, - iatScale = -1.39, iatShape = 1.03, // Hours - sizeScale = 1.88, sizeShape = 1.25, - dScale = 9.51, dShape = 3.21, // Minutes - random = random - ) -} - -/** - * Create the trace reader from which the VM workloads are read. - */ -fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List, seed: Int): Sc20ParquetTraceReader { - return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) -} - -/** - * Construct the environment for a VM provisioner and return the provisioner instance. - */ -suspend fun createProvisioner( - root: Domain, - environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy -): Pair = withContext(root.coroutineContext) { - val environment = environmentReader.use { it.construct(root) } - val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] - - // Wait for the bare metal nodes to be spawned - delay(10) - - val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) - - // Wait for the hypervisors to be spawned - delay(10) - - bareMetalProvisioner to scheduler -} - -/** - * Attach the specified monitor to the VM provisioner. - */ -@OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc20Monitor) { - val domain = simulationContext.domain - val hypervisors = scheduler.drivers() - - // Monitor hypervisor events - for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - hypervisor.server.events - .onEach { event -> - when (event) { - is ServerEvent.StateChanged -> { - monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - } - } - } - .launchIn(domain) - hypervisor.events - .onEach { event -> - when (event) { - is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( - simulationContext.clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.hostServer, - scheduler.submittedVms, - scheduler.queuedVms, - scheduler.runningVms, - scheduler.finishedVms - ) - } - } - .launchIn(domain) - } -} - -/** - * Process the trace. - */ -suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, vmPlacements: Map, monitor: Sc20Monitor) { - try { - coroutineScope { - var submitted = 0L - val finished = Channel(Channel.CONFLATED) - val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) - - while (reader.hasNext()) { - val (time, workload) = reader.next() - - if (vmPlacements.isNotEmpty()) { - val vmId = workload.name.replace("VM Workload ", "") - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null) { - println("Could not find placement data in VM placement file for VM $vmId") - continue - } - val machineInCluster = hypervisors.ceiling(clusterName)?.let { it.contains(clusterName) } ?: false - if (machineInCluster) { - println("Ignored VM") - continue - } - } - - submitted++ - delay(max(0, time - simulationContext.clock.millis())) - launch { - chan.send(Unit) - val server = scheduler.deploy( - workload.image.name, workload.image, - Flavor(workload.image.maxCores, workload.image.requiredMemory) - ) - // Monitor server events - server.events - .onEach { - if (it is ServerEvent.StateChanged) { - monitor.onVmStateChanged(it.server) - } - - finished.send(Unit) - } - .collect() - } - } - - while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) { - finished.receive() - } - } - } finally { - reader.close() - } -} - -/** - * Main entry point of the experiment. - */ -@OptIn(ExperimentalCoroutinesApi::class) -fun main(args: Array) { - val cli = ArgParser(args).parseInto(::ExperimentParameters) - println("trace-directory: ${cli.traceDirectory}") - println("environment-file: ${cli.environmentFile}") - println("performance-interference-file: ${cli.performanceInterferenceFile}") - println("selected-vms-file: ${cli.selectedVmsFile}") - println("seed: ${cli.seed}") - println("failures: ${cli.failures}") - println("allocation-policy: ${cli.allocationPolicy}") - - val start = System.currentTimeMillis() - val monitor: Sc20Monitor = Sc20ParquetMonitor(cli.outputFile) - - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") - - val chan = Channel(Channel.CONFLATED) - - val performanceInterferenceModel = try { - val performanceInterferenceStream = if (cli.performanceInterferenceFile != null) { - File(cli.performanceInterferenceFile!!).inputStream().buffered() - } else { - object {}.javaClass.getResourceAsStream("/env/performance-interference.json") - } - Sc20PerformanceInterferenceReader(performanceInterferenceStream) - .construct() - } catch (e: Throwable) { - monitor.close() - throw e - } - val vmPlacements = if (cli.vmPlacementFile == null) { - emptyMap() - } else { - Sc20VmPlacementReader(File(cli.vmPlacementFile!!).inputStream().buffered()).construct() - } - val environmentReader = Sc20ClusterEnvironmentReader(File(cli.environmentFile)) - val traceReader = try { - createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed) - } catch (e: Throwable) { - monitor.close() - throw e - } - val allocationPolicy = when (cli.allocationPolicy) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(cli.seed)) - "replay" -> ReplayAllocationPolicy(vmPlacements) - else -> throw IllegalArgumentException("Unknown allocation policy: ${cli.allocationPolicy}") - } - - root.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) - - val failureDomain = if (cli.failures) { - println("ENABLING failures") - createFailureDomain(cli.seed, bareMetalProvisioner, chan) - } else { - null - } - - attachMonitor(scheduler, monitor) - processTrace(traceReader, scheduler, chan, vmPlacements, monitor) - - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") - } - - runBlocking { - system.run() - system.terminate() - } - - // Explicitly close the monitor to flush its buffer - monitor.close() -} diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt new file mode 100644 index 00000000..dd0931e4 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -0,0 +1,183 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.SimulationEngine +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.io.File +import java.util.ServiceLoader + +/** + * An integration test suite for the SC20 experiments. + */ +class Sc20IntegrationTest { + /** + * The simulation engine to use. + */ + private lateinit var simulationEngine: SimulationEngine + + /** + * The root simulation domain to run in. + */ + private lateinit var root: Domain + + /** + * The monitor used to keep track of the metrics. + */ + private lateinit var monitor: TestSc20Monitor + + /** + * Setup the experimental environment. + */ + @BeforeEach + fun setUp() { + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + simulationEngine = provider("test") + root = simulationEngine.newDomain("root") + monitor = TestSc20Monitor() + } + + /** + * Tear down the experimental environment. + */ + @AfterEach + fun tearDown() = runBlocking { + simulationEngine.terminate() + } + + @Test + fun smoke() { + val failures = false + val seed = 0 + val chan = Channel(Channel.CONFLATED) + val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val traceReader = createTestTraceReader() + val environmentReader = createTestEnvironmentReader() + lateinit var scheduler: SimpleVirtProvisioningService + + root.launch { + val res = createProvisioner(root, environmentReader, allocationPolicy) + val bareMetalProvisioner = res.first + scheduler = res.second + + val failureDomain = if (failures) { + println("ENABLING failures") + createFailureDomain(seed, bareMetalProvisioner, chan) + } else { + null + } + + attachMonitor(scheduler, monitor) + processTrace(traceReader, scheduler, chan, monitor) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + runSimulation() + + // Note that these values have been verified beforehand + assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") + assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") + assertEquals(207379117949, monitor.totalRequestedBurst) + assertEquals(207378478631, monitor.totalGrantedBurst) + assertEquals(639360, monitor.totalOvercommissionedBurst) + assertEquals(0, monitor.totalInterferedBurst) + } + + /** + * Run the simulation. + */ + private fun runSimulation() = runBlocking { + simulationEngine.run() + } + + /** + * Obtain the trace reader for the test. + */ + private fun createTestTraceReader(): TraceReader { + val performanceInterferenceStream = object {}.javaClass.getResourceAsStream("/env/performance-interference.json") + val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) + .construct() + return createTraceReader(File("src/test/resources/trace"), performanceInterferenceModel, emptyList(), 0) + } + + /** + * Obtain the environment reader for the test. + */ + private fun createTestEnvironmentReader(): EnvironmentReader { + val stream = object {}.javaClass.getResourceAsStream("/env/topology.txt") + return Sc20ClusterEnvironmentReader(stream) + } + + class TestSc20Monitor : Sc20Monitor { + var totalRequestedBurst = 0L + var totalGrantedBurst = 0L + var totalOvercommissionedBurst = 0L + var totalInterferedBurst = 0L + + override suspend fun onSliceFinish( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, + duration: Long + ) { + totalRequestedBurst += requestedBurst + totalGrantedBurst += grantedBurst + totalOvercommissionedBurst += overcommissionedBurst + totalInterferedBurst += interferedBurst + } + override fun close() {} + } +} diff --git a/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt b/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt new file mode 100644 index 00000000..6b347bff --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt @@ -0,0 +1,5 @@ +ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost +A01;A01;32;3.2;2048;1;256;32 +B01;B01;48;2.93;1256;6;64;8 +C01;C01;32;3.2;2048;2;128;16 + diff --git a/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet b/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet new file mode 100644 index 00000000..ce7a812c Binary files /dev/null and b/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet differ diff --git a/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet b/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet new file mode 100644 index 00000000..1d7ce882 Binary files /dev/null and b/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet differ diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 2ef0db97..e34ee2dc 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -38,9 +38,9 @@ import com.atlarge.opendc.core.Platform import com.atlarge.opendc.core.Zone import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.format.environment.EnvironmentReader -import java.io.BufferedReader import java.io.File -import java.io.FileReader +import java.io.FileInputStream +import java.io.InputStream import java.util.Random import java.util.UUID @@ -50,8 +50,11 @@ import java.util.UUID * @param environmentFile The file describing the physical cluster. */ class Sc20ClusterEnvironmentReader( - private val environmentFile: File + private val input: InputStream ) : EnvironmentReader { + + constructor(file: File) : this(FileInputStream(file)) + @Suppress("BlockingMethodInNonBlockingContext") override suspend fun construct(dom: Domain): Environment { var clusterIdCol = 0 @@ -70,7 +73,7 @@ class Sc20ClusterEnvironmentReader( val nodes = mutableListOf() val random = Random(0) - BufferedReader(FileReader(environmentFile)).use { reader -> + input.bufferedReader().use { reader -> reader.lineSequence() .filter { line -> // Ignore comments in the file -- cgit v1.2.3