diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-05-06 11:54:21 +0200 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-05-06 11:54:21 +0200 |
| commit | 171d9948af5f14243da7aa140a231dbf9ae8df15 (patch) | |
| tree | c42f2a5b08923e91b0afad4bc686b298c165a1ea /opendc | |
| parent | 9d17c77a85d7d88287b06a521b0c6358f589ca9a (diff) | |
| parent | 48f6a6f2d42851bc2eeed5b6ef41145740c70286 (diff) | |
Merge branch 'test/integration-test' into '2.x'
Add integration tests for SC20 experiments
Closes #54
See merge request opendc/opendc-simulator!63
Diffstat (limited to 'opendc')
14 files changed, 754 insertions, 448 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 520f6dc5..2185b372 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -139,10 +139,13 @@ class SimpleVirtProvisioningService( if (selectedHv == null) { if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { unscheduledVms++ + incomingImages -= imageInstance + println("[${clock.millis()}] CANNOT SPAWN ${imageInstance.image}") + continue + } else { + break } - - break } try { diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index 8611ffa7..ccfa3038 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -31,7 +31,7 @@ plugins { } application { - mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperimentKt" + mainClassName = "com.atlarge.opendc.experiments.sc20.Sc20ExperimentKt" applicationDefaultJvmArgs = listOf("-Xmx2500M", "-Xms1800M") } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt new file mode 100644 index 00000000..fc4b9058 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt @@ -0,0 +1,366 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorEvent +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver +import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy +import com.atlarge.opendc.core.failure.CorrelatedFaultInjector +import com.atlarge.opendc.core.failure.FailureDomain +import com.atlarge.opendc.core.failure.FaultInjector +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader +import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import com.xenomachina.argparser.ArgParser +import com.xenomachina.argparser.default +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import java.io.File +import java.io.FileReader +import java.lang.IllegalArgumentException +import java.util.ServiceLoader +import java.util.TreeSet +import kotlin.math.max +import kotlin.random.Random + +class ExperimentParameters(parser: ArgParser) { + val traceDirectory by parser.storing("path to the trace directory") + val environmentFile by parser.storing("path to the environment file") + val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null } + val vmPlacementFile by parser.storing("path to the VM placement file").default { null } + val outputFile by parser.storing("path to where the output should be stored") + .default { "data/results-${System.currentTimeMillis()}.parquet" } + val selectedVms by parser.storing("the VMs to run") { parseVMs(this) } + .default { emptyList() } + val selectedVmsFile by parser.storing("path to a file containing the VMs to run") { + parseVMs(FileReader(File(this)).readText()) + } + .default { emptyList() } + val seed by parser.storing("the random seed") { toInt() } + .default(0) + val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures") + val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem") + + fun getSelectedVmList(): List<String> { + return if (selectedVms.isEmpty()) { + selectedVmsFile + } else { + selectedVms + } + } + + private fun parseVMs(string: String): List<String> { + // Handle case where VM list contains a VM name with an (escaped) single-quote in it + val sanitizedString = string.replace("\\'", "\\\\[") + .replace("'", "\"") + .replace("\\\\[", "'") + val vms: List<String> = jacksonObjectMapper().readValue(sanitizedString) + return vms + } +} + +/** + * Construct the failure domain for the experiments. + */ +suspend fun createFailureDomain(seed: Int, bareMetalProvisioner: ProvisioningService, chan: Channel<Unit>): Domain { + val root = simulationContext.domain + val domain = root.newDomain(name = "failures") + domain.launch { + chan.receive() + val random = Random(seed) + val injectors = mutableMapOf<String, FaultInjector>() + for (node in bareMetalProvisioner.nodes()) { + val cluster = node.metadata[NODE_CLUSTER] as String + val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) } + injector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + return domain +} + +/** + * Obtain the [FaultInjector] to use for the experiments. + */ +fun createFaultInjector(domain: Domain, random: Random): FaultInjector { + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return CorrelatedFaultInjector(domain, + iatScale = -1.39, iatShape = 1.03, // Hours + sizeScale = 1.88, sizeShape = 1.25, + dScale = 9.51, dShape = 3.21, // Minutes + random = random + ) +} + +/** + * Create the trace reader from which the VM workloads are read. + */ +fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader { + return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) +} + +/** + * Construct the environment for a VM provisioner and return the provisioner instance. + */ +suspend fun createProvisioner( + root: Domain, + environmentReader: EnvironmentReader, + allocationPolicy: AllocationPolicy +): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) { + val environment = environmentReader.use { it.construct(root) } + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] + + // Wait for the bare metal nodes to be spawned + delay(10) + + val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) + + // Wait for the hypervisors to be spawned + delay(10) + + bareMetalProvisioner to scheduler +} + +/** + * Attach the specified monitor to the VM provisioner. + */ +@OptIn(ExperimentalCoroutinesApi::class) +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc20Monitor) { + val domain = simulationContext.domain + val hypervisors = scheduler.drivers() + + // Monitor hypervisor events + for (hypervisor in hypervisors) { + // TODO Do not expose VirtDriver directly but use Hypervisor class. + monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + hypervisor.server.events + .onEach { event -> + when (event) { + is ServerEvent.StateChanged -> { + monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + } + } + } + .launchIn(domain) + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( + simulationContext.clock.millis(), + event.requestedBurst, + event.grantedBurst, + event.overcommissionedBurst, + event.interferedBurst, + event.cpuUsage, + event.cpuDemand, + event.numberOfDeployedImages, + event.hostServer, + scheduler.submittedVms, + scheduler.queuedVms, + scheduler.runningVms, + scheduler.finishedVms + ) + } + } + .launchIn(domain) + } +} + +/** + * Process the trace. + */ +suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: Sc20Monitor, vmPlacements: Map<String, String> = emptyMap()) { + val domain = simulationContext.domain + + try { + var submitted = 0L + val finished = Channel<Unit>(Channel.CONFLATED) + val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) + + while (reader.hasNext()) { + val (time, workload) = reader.next() + + if (vmPlacements.isNotEmpty()) { + val vmId = workload.name.replace("VM Workload ", "") + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null) { + println("Could not find placement data in VM placement file for VM $vmId") + continue + } + val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false + if (machineInCluster) { + println("Ignored VM") + continue + } + } + + submitted++ + delay(max(0, time - simulationContext.clock.millis())) + domain.launch { + chan.send(Unit) + val server = scheduler.deploy( + workload.image.name, workload.image, + Flavor(workload.image.maxCores, workload.image.requiredMemory) + ) + // Monitor server events + server.events + .onEach { + if (it is ServerEvent.StateChanged) { + monitor.onVmStateChanged(it.server) + } + + delay(1) + finished.send(Unit) + } + .collect() + } + } + + while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) { + finished.receive() + } + } finally { + reader.close() + } +} + +/** + * Main entry point of the experiment. + */ +@OptIn(ExperimentalCoroutinesApi::class) +fun main(args: Array<String>) { + val cli = ArgParser(args).parseInto(::ExperimentParameters) + println("trace-directory: ${cli.traceDirectory}") + println("environment-file: ${cli.environmentFile}") + println("performance-interference-file: ${cli.performanceInterferenceFile}") + println("selected-vms-file: ${cli.selectedVmsFile}") + println("seed: ${cli.seed}") + println("failures: ${cli.failures}") + println("allocation-policy: ${cli.allocationPolicy}") + + val start = System.currentTimeMillis() + val monitor: Sc20Monitor = Sc20ParquetMonitor(cli.outputFile) + + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + val system = provider("test") + val root = system.newDomain("root") + + val chan = Channel<Unit>(Channel.CONFLATED) + + val performanceInterferenceModel = try { + val performanceInterferenceStream = if (cli.performanceInterferenceFile != null) { + File(cli.performanceInterferenceFile!!).inputStream().buffered() + } else { + object {}.javaClass.getResourceAsStream("/env/performance-interference.json") + } + Sc20PerformanceInterferenceReader(performanceInterferenceStream) + .construct() + } catch (e: Throwable) { + monitor.close() + throw e + } + val vmPlacements = if (cli.vmPlacementFile == null) { + emptyMap() + } else { + Sc20VmPlacementReader(File(cli.vmPlacementFile!!).inputStream().buffered()).construct() + } + val environmentReader = Sc20ClusterEnvironmentReader(File(cli.environmentFile)) + val traceReader = try { + createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed) + } catch (e: Throwable) { + monitor.close() + throw e + } + val allocationPolicy = when (cli.allocationPolicy) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(cli.seed)) + "replay" -> ReplayAllocationPolicy(vmPlacements) + else -> throw IllegalArgumentException("Unknown allocation policy: ${cli.allocationPolicy}") + } + + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) + + val failureDomain = if (cli.failures) { + println("ENABLING failures") + createFailureDomain(cli.seed, bareMetalProvisioner, chan) + } else { + null + } + + attachMonitor(scheduler, monitor) + processTrace(traceReader, scheduler, chan, monitor, vmPlacements) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") + } + + runBlocking { + system.run() + system.terminate() + } + + // Explicitly close the monitor to flush its buffer + monitor.close() +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index bac0de21..4b8b80a8 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -1,66 +1,34 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.atlarge.opendc.experiments.sc20 -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver -import kotlinx.coroutines.flow.first -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.io.Closeable -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread - -class Sc20Monitor( - destination: String -) : Closeable { - private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() - private val schema = SchemaBuilder - .record("slice") - .namespace("com.atlarge.opendc.experiments.sc20") - .fields() - .name("time").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("requestedBurst").type().longType().noDefault() - .name("grantedBurst").type().longType().noDefault() - .name("overcommissionedBurst").type().longType().noDefault() - .name("interferedBurst").type().longType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("cpuDemand").type().doubleType().noDefault() - .name("numberOfDeployedImages").type().intType().noDefault() - .name("server").type().stringType().noDefault() - .name("hostState").type().stringType().noDefault() - .name("hostUsage").type().doubleType().noDefault() - .name("powerDraw").type().doubleType().noDefault() - .name("totalSubmittedVms").type().longType().noDefault() - .name("totalQueuedVms").type().longType().noDefault() - .name("totalRunningVms").type().longType().noDefault() - .name("totalFinishedVms").type().longType().noDefault() - .endRecord() - private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - private val queue = ArrayBlockingQueue<GenericData.Record>(2048) - private val writerThread = thread(start = true, name = "sc20-writer") { - try { - while (true) { - val record = queue.take() - writer.write(record) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - writer.close() - } - } +interface Sc20Monitor : Closeable { suspend fun onVmStateChanged(server: Server) {} suspend fun serverStateChanged( @@ -70,32 +38,7 @@ class Sc20Monitor( queuedVms: Long, runningVms: Long, finishedVms: Long - ) { - val lastServerState = lastServerStates[server] - if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = simulationContext.clock.millis() - lastServerState.second - onSliceFinish( - simulationContext.clock.millis(), - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server, - submittedVms, - queuedVms, - runningVms, - finishedVms, - duration - ) - } - - println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}") - - lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) - } + ) {} suspend fun onSliceFinish( time: Long, @@ -112,39 +55,5 @@ class Sc20Monitor( runningVms: Long, finishedVms: Long, duration: Long = 5 * 60 * 1000L - ) { - // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.services[BareMetalDriver.Key] - val usage = driver.usage.first() - val powerDraw = driver.powerDraw.first() - - val record = GenericData.Record(schema) - record.put("time", time) - record.put("duration", duration) - record.put("requestedBurst", requestedBurst) - record.put("grantedBurst", grantedBurst) - record.put("overcommissionedBurst", overcommissionedBurst) - record.put("interferedBurst", interferedBurst) - record.put("cpuUsage", cpuUsage) - record.put("cpuDemand", cpuDemand) - record.put("numberOfDeployedImages", numberOfDeployedImages) - record.put("server", hostServer.uid) - record.put("hostState", hostServer.state) - record.put("hostUsage", usage) - record.put("powerDraw", powerDraw) - record.put("totalSubmittedVms", submittedVms) - record.put("totalQueuedVms", queuedVms) - record.put("totalRunningVms", runningVms) - record.put("totalFinishedVms", finishedVms) - - queue.put(record) - } - - override fun close() { - // Busy loop to wait for writer thread to finish - while (queue.isNotEmpty()) { - Thread.sleep(500) - } - writerThread.interrupt() - } + ) {} } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt new file mode 100644 index 00000000..5e554196 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt @@ -0,0 +1,149 @@ +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import kotlinx.coroutines.flow.first +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread + +class Sc20ParquetMonitor( + destination: String +) : Sc20Monitor { + private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() + private val schema = SchemaBuilder + .record("slice") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("requestedBurst").type().longType().noDefault() + .name("grantedBurst").type().longType().noDefault() + .name("overcommissionedBurst").type().longType().noDefault() + .name("interferedBurst").type().longType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("cpuDemand").type().doubleType().noDefault() + .name("numberOfDeployedImages").type().intType().noDefault() + .name("server").type().stringType().noDefault() + .name("hostState").type().stringType().noDefault() + .name("hostUsage").type().doubleType().noDefault() + .name("powerDraw").type().doubleType().noDefault() + .name("totalSubmittedVms").type().longType().noDefault() + .name("totalQueuedVms").type().longType().noDefault() + .name("totalRunningVms").type().longType().noDefault() + .name("totalFinishedVms").type().longType().noDefault() + .endRecord() + private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + private val queue = ArrayBlockingQueue<GenericData.Record>(2048) + private val writerThread = thread(start = true, name = "sc20-writer") { + try { + while (true) { + val record = queue.take() + writer.write(record) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + writer.close() + } + } + + override suspend fun onVmStateChanged(server: Server) {} + + override suspend fun serverStateChanged( + driver: VirtDriver, + server: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long + ) { + val lastServerState = lastServerStates[server] + if (server.state == ServerState.SHUTOFF && lastServerState != null) { + val duration = simulationContext.clock.millis() - lastServerState.second + onSliceFinish( + simulationContext.clock.millis(), + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server, + submittedVms, + queuedVms, + runningVms, + finishedVms, + duration + ) + } + + println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}") + + lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) + } + + override suspend fun onSliceFinish( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, + duration: Long + ) { + // Assume for now that the host is not virtualized and measure the current power draw + val driver = hostServer.services[BareMetalDriver.Key] + val usage = driver.usage.first() + val powerDraw = driver.powerDraw.first() + + val record = GenericData.Record(schema) + record.put("time", time) + record.put("duration", duration) + record.put("requestedBurst", requestedBurst) + record.put("grantedBurst", grantedBurst) + record.put("overcommissionedBurst", overcommissionedBurst) + record.put("interferedBurst", interferedBurst) + record.put("cpuUsage", cpuUsage) + record.put("cpuDemand", cpuDemand) + record.put("numberOfDeployedImages", numberOfDeployedImages) + record.put("server", hostServer.uid) + record.put("hostState", hostServer.state) + record.put("hostUsage", usage) + record.put("powerDraw", powerDraw) + record.put("totalSubmittedVms", submittedVms) + record.put("totalQueuedVms", queuedVms) + record.put("totalRunningVms", runningVms) + record.put("totalFinishedVms", finishedVms) + + queue.put(record) + } + + override fun close() { + // Busy loop to wait for writer thread to finish + while (queue.isNotEmpty()) { + Thread.sleep(500) + } + writerThread.interrupt() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt deleted file mode 100644 index a2f609a5..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ /dev/null @@ -1,314 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.ServerEvent -import com.atlarge.opendc.compute.metal.NODE_CLUSTER -import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.compute.virt.HypervisorEvent -import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver -import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService -import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy -import com.atlarge.opendc.core.failure.CorrelatedFaultInjector -import com.atlarge.opendc.core.failure.FailureDomain -import com.atlarge.opendc.core.failure.FaultInjector -import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader -import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import com.xenomachina.argparser.ArgParser -import com.xenomachina.argparser.default -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import java.io.File -import java.io.FileReader -import java.util.ServiceLoader -import kotlin.math.max -import kotlin.random.Random - -class ExperimentParameters(parser: ArgParser) { - val traceDirectory by parser.storing("path to the trace directory") - val environmentFile by parser.storing("path to the environment file") - val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null } - val vmPlacementFile by parser.storing("path to the VM placement file").default { null } - val outputFile by parser.storing("path to where the output should be stored") - .default { "data/results-${System.currentTimeMillis()}.parquet" } - val selectedVms by parser.storing("the VMs to run") { parseVMs(this) } - .default { emptyList() } - val selectedVmsFile by parser.storing("path to a file containing the VMs to run") { - parseVMs(FileReader(File(this)).readText()) - } - .default { emptyList() } - val seed by parser.storing("the random seed") { toInt() } - .default(0) - val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures") - val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem") - - fun getSelectedVmList(): List<String> { - return if (selectedVms.isEmpty()) { - selectedVmsFile - } else { - selectedVms - } - } - - private fun parseVMs(string: String): List<String> { - // Handle case where VM list contains a VM name with an (escaped) single-quote in it - val sanitizedString = string.replace("\\'", "\\\\[") - .replace("'", "\"") - .replace("\\\\[", "'") - val vms: List<String> = jacksonObjectMapper().readValue(sanitizedString) - return vms - } -} - -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -fun createFaultInjector(domain: Domain, random: Random): FaultInjector { - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return CorrelatedFaultInjector(domain, - iatScale = -1.39, iatShape = 1.03, // Hours - sizeScale = 1.88, sizeShape = 1.25, - dScale = 9.51, dShape = 3.21, // Minutes - random = random - ) -} - -/** - * Main entry point of the experiment. - */ -@OptIn(ExperimentalCoroutinesApi::class) -fun main(args: Array<String>) { - ArgParser(args).parseInto(::ExperimentParameters).run { - println("trace-directory: $traceDirectory") - println("environment-file: $environmentFile") - println("performance-interference-file: $performanceInterferenceFile") - println("vm-placement-file: $vmPlacementFile") - println("selected-vms-file: $selectedVmsFile") - println("seed: $seed") - println("failures: $failures") - println("allocation-policy: $allocationPolicy") - - val start = System.currentTimeMillis() - val monitor = Sc20Monitor(outputFile) - - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") - val chan = Channel<Unit>(Channel.CONFLATED) - - val vmPlacements = if (vmPlacementFile == null) { - emptyMap() - } else { - Sc20VmPlacementReader(File(vmPlacementFile!!).inputStream().buffered()).construct() - } - - val allocationPolicies = mapOf( - "mem" to AvailableMemoryAllocationPolicy(), - "mem-inv" to AvailableMemoryAllocationPolicy(true), - "core-mem" to AvailableCoreMemoryAllocationPolicy(), - "core-mem-inv" to AvailableCoreMemoryAllocationPolicy(true), - "active-servers" to NumberOfActiveServersAllocationPolicy(), - "active-servers-inv" to NumberOfActiveServersAllocationPolicy(true), - "provisioned-cores" to ProvisionedCoresAllocationPolicy(), - "provisioned-cores-inv" to ProvisionedCoresAllocationPolicy(true), - "replay" to ReplayAllocationPolicy(vmPlacements), - "random" to RandomAllocationPolicy(Random(seed)) - ) - - if (allocationPolicy !in allocationPolicies) { - println("error: unknown allocation policy $allocationPolicy") - println("Available:") - allocationPolicies.keys.forEach { key -> println(key) } - } - - root.launch { - val environment = Sc20ClusterEnvironmentReader(File(environmentFile)) - .use { it.construct(root) } - - val performanceInterferenceStream = if (performanceInterferenceFile != null) { - File(performanceInterferenceFile!!).inputStream().buffered() - } else { - object {}.javaClass.getResourceAsStream("/env/performance-interference.json") - } - - val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) - .construct() - - println(simulationContext.clock.instant()) - - val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] - - // Wait for the bare metal nodes to be spawned - delay(10) - - val scheduler = SimpleVirtProvisioningService( - allocationPolicies.getValue(allocationPolicy), - simulationContext, - bareMetalProvisioner - ) - - // Wait for the hypervisors to be spawned - delay(10) - - val hypervisors = scheduler.drivers() - - // Monitor hypervisor events - for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - hypervisor.server.events - .onEach { event -> - when (event) { - is ServerEvent.StateChanged -> { - monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - } - } - } - .launchIn(this) - hypervisor.events - .onEach { event -> - when (event) { - is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( - simulationContext.clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.hostServer, - scheduler.submittedVms, - scheduler.queuedVms, - scheduler.runningVms, - scheduler.finishedVms - ) - } - } - .launchIn(this) - } - - val failureDomain = if (failures) { - println("ENABLING failures") - val domain = root.newDomain(name = "failures") - domain.launch { - chan.receive() - val random = Random(seed) - val injectors = mutableMapOf<String, FaultInjector>() - for (node in bareMetalProvisioner.nodes()) { - val cluster = node.metadata[NODE_CLUSTER] as String - val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) } - injector.enqueue(node.metadata["driver"] as FailureDomain) - } - } - domain - } else { - null - } - - var submitted = 0L - val finished = Channel<Unit>(Channel.RENDEZVOUS) - val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) - while (reader.hasNext()) { - val (time, workload) = reader.next() - - if (vmPlacements.isNotEmpty()) { - val vmId = workload.name.replace("VM Workload ", "") - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null) { - println("Could not find placement data in VM placement file for VM $vmId") - continue - } - val machinesInCluster = - hypervisors.filter { (it as SimpleVirtDriver).server.name.contains(clusterName) } - if (machinesInCluster.isEmpty()) { - println("Ignored VM") - continue - } - } - - submitted++ - delay(max(0, time - simulationContext.clock.millis())) - launch { - chan.send(Unit) - val server = scheduler.deploy( - workload.image.name, workload.image, - Flavor(workload.image.maxCores, workload.image.requiredMemory) - ) - // Monitor server events - server.events - .onEach { - if (it is ServerEvent.StateChanged) { - monitor.onVmStateChanged(it.server) - } - - finished.send(Unit) - } - .collect() - } - } - - while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) { - finished.receive() - } - - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - reader.close() - println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") - } - - runBlocking { - system.run() - system.terminate() - } - - // Explicitly close the monitor to flush its buffer - monitor.close() - } -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt index d005a157..c62f59f9 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt @@ -36,10 +36,14 @@ import kotlin.math.max import kotlin.math.min /** - * - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + * A script to convert a trace in text format into a Parquet trace. */ -fun main() { +fun main(args: Array<String>) { + if (args.size < 2) { + println("error: expected <INPUT> <OUTPUT>") + return + } + val metaSchema = SchemaBuilder .record("meta") .namespace("com.atlarge.opendc.format.sc20") @@ -69,8 +73,8 @@ fun main() { val provisionedMemoryCol = 20 val traceInterval = 5 * 60 * 1000L - val dest = File("../traces/solvinity/small-parquet") - val traceDirectory = File("../traces/solvinity/small") + val dest = File(args[0]) + val traceDirectory = File(args[1]) val vms = traceDirectory.walk() .filterNot { it.isDirectory } diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt new file mode 100644 index 00000000..dd0931e4 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -0,0 +1,183 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.SimulationEngine +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.io.File +import java.util.ServiceLoader + +/** + * An integration test suite for the SC20 experiments. + */ +class Sc20IntegrationTest { + /** + * The simulation engine to use. + */ + private lateinit var simulationEngine: SimulationEngine + + /** + * The root simulation domain to run in. + */ + private lateinit var root: Domain + + /** + * The monitor used to keep track of the metrics. + */ + private lateinit var monitor: TestSc20Monitor + + /** + * Setup the experimental environment. + */ + @BeforeEach + fun setUp() { + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + simulationEngine = provider("test") + root = simulationEngine.newDomain("root") + monitor = TestSc20Monitor() + } + + /** + * Tear down the experimental environment. + */ + @AfterEach + fun tearDown() = runBlocking { + simulationEngine.terminate() + } + + @Test + fun smoke() { + val failures = false + val seed = 0 + val chan = Channel<Unit>(Channel.CONFLATED) + val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val traceReader = createTestTraceReader() + val environmentReader = createTestEnvironmentReader() + lateinit var scheduler: SimpleVirtProvisioningService + + root.launch { + val res = createProvisioner(root, environmentReader, allocationPolicy) + val bareMetalProvisioner = res.first + scheduler = res.second + + val failureDomain = if (failures) { + println("ENABLING failures") + createFailureDomain(seed, bareMetalProvisioner, chan) + } else { + null + } + + attachMonitor(scheduler, monitor) + processTrace(traceReader, scheduler, chan, monitor) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + runSimulation() + + // Note that these values have been verified beforehand + assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") + assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") + assertEquals(207379117949, monitor.totalRequestedBurst) + assertEquals(207378478631, monitor.totalGrantedBurst) + assertEquals(639360, monitor.totalOvercommissionedBurst) + assertEquals(0, monitor.totalInterferedBurst) + } + + /** + * Run the simulation. + */ + private fun runSimulation() = runBlocking { + simulationEngine.run() + } + + /** + * Obtain the trace reader for the test. + */ + private fun createTestTraceReader(): TraceReader<VmWorkload> { + val performanceInterferenceStream = object {}.javaClass.getResourceAsStream("/env/performance-interference.json") + val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) + .construct() + return createTraceReader(File("src/test/resources/trace"), performanceInterferenceModel, emptyList(), 0) + } + + /** + * Obtain the environment reader for the test. + */ + private fun createTestEnvironmentReader(): EnvironmentReader { + val stream = object {}.javaClass.getResourceAsStream("/env/topology.txt") + return Sc20ClusterEnvironmentReader(stream) + } + + class TestSc20Monitor : Sc20Monitor { + var totalRequestedBurst = 0L + var totalGrantedBurst = 0L + var totalOvercommissionedBurst = 0L + var totalInterferedBurst = 0L + + override suspend fun onSliceFinish( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, + duration: Long + ) { + totalRequestedBurst += requestedBurst + totalGrantedBurst += grantedBurst + totalOvercommissionedBurst += overcommissionedBurst + totalInterferedBurst += interferedBurst + } + override fun close() {} + } +} diff --git a/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt b/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt new file mode 100644 index 00000000..6b347bff --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt @@ -0,0 +1,5 @@ +ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost +A01;A01;32;3.2;2048;1;256;32 +B01;B01;48;2.93;1256;6;64;8 +C01;C01;32;3.2;2048;2;128;16 + diff --git a/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet b/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet Binary files differnew file mode 100644 index 00000000..ce7a812c --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet diff --git a/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet b/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet Binary files differnew file mode 100644 index 00000000..1d7ce882 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 2ef0db97..e34ee2dc 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -38,9 +38,9 @@ import com.atlarge.opendc.core.Platform import com.atlarge.opendc.core.Zone import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.format.environment.EnvironmentReader -import java.io.BufferedReader import java.io.File -import java.io.FileReader +import java.io.FileInputStream +import java.io.InputStream import java.util.Random import java.util.UUID @@ -50,8 +50,11 @@ import java.util.UUID * @param environmentFile The file describing the physical cluster. */ class Sc20ClusterEnvironmentReader( - private val environmentFile: File + private val input: InputStream ) : EnvironmentReader { + + constructor(file: File) : this(FileInputStream(file)) + @Suppress("BlockingMethodInNonBlockingContext") override suspend fun construct(dom: Domain): Environment { var clusterIdCol = 0 @@ -70,7 +73,7 @@ class Sc20ClusterEnvironmentReader( val nodes = mutableListOf<SimpleBareMetalDriver>() val random = Random(0) - BufferedReader(FileReader(environmentFile)).use { reader -> + input.bufferedReader().use { reader -> reader.lineSequence() .filter { line -> // Ignore comments in the file diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index fe1049d9..5220af9b 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package com.atlarge.opendc.format.trace.vm +package com.atlarge.opendc.format.trace.bitbrains import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage @@ -43,7 +43,7 @@ import java.util.UUID * @param traceDirectory The directory of the traces. * @param performanceInterferenceModel The performance model covering the workload in the VM trace. */ -class VmTraceReader( +class BitbrainsTraceReader( traceDirectory: File, performanceInterferenceModel: PerformanceInterferenceModel ) : TraceReader<VmWorkload> { diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index 2e2159ba..c53cd569 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -117,7 +117,6 @@ class Sc20TraceReader( reader.lineSequence() .chunked(128) .forEach { lines -> - // val res = ArrayList<FlopsHistoryFragment>(lines.size) for (line in lines) { // Ignore comments in the trace if (line.startsWith("#") || line.isBlank()) { @@ -149,7 +148,6 @@ class Sc20TraceReader( fragment } } - // yieldAll(res) } if (last != null) { |
