diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-05 22:35:24 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-06 11:42:20 +0200 |
| commit | 48f6a6f2d42851bc2eeed5b6ef41145740c70286 (patch) | |
| tree | c42f2a5b08923e91b0afad4bc686b298c165a1ea /opendc/opendc-experiments-sc20/src | |
| parent | 2b9b1e9e030dccacf9aa549fc49b2e5e382750bf (diff) | |
test: Add initial integration test for SC20 experiments
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
| -rw-r--r-- | opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt) | 90 | ||||
| -rw-r--r-- | opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt | 183 | ||||
| -rw-r--r-- | opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt | 5 | ||||
| -rw-r--r-- | opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet | bin | 0 -> 2148 bytes | |||
| -rw-r--r-- | opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet | bin | 0 -> 1672463 bytes |
5 files changed, 233 insertions, 45 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt index 6f1e9aae..fc4b9058 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt @@ -58,7 +58,6 @@ import com.xenomachina.argparser.default import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn @@ -220,56 +219,57 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc2 /** * Process the trace. */ -suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, vmPlacements: Map<String, String>, monitor: Sc20Monitor) { +suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: Sc20Monitor, vmPlacements: Map<String, String> = emptyMap()) { + val domain = simulationContext.domain + try { - coroutineScope { - var submitted = 0L - val finished = Channel<Unit>(Channel.CONFLATED) - val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) - - while (reader.hasNext()) { - val (time, workload) = reader.next() - - if (vmPlacements.isNotEmpty()) { - val vmId = workload.name.replace("VM Workload ", "") - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null) { - println("Could not find placement data in VM placement file for VM $vmId") - continue - } - val machineInCluster = hypervisors.ceiling(clusterName)?.let { it.contains(clusterName) } ?: false - if (machineInCluster) { - println("Ignored VM") - continue - } + var submitted = 0L + val finished = Channel<Unit>(Channel.CONFLATED) + val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) + + while (reader.hasNext()) { + val (time, workload) = reader.next() + + if (vmPlacements.isNotEmpty()) { + val vmId = workload.name.replace("VM Workload ", "") + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null) { + println("Could not find placement data in VM placement file for VM $vmId") + continue } - - submitted++ - delay(max(0, time - simulationContext.clock.millis())) - launch { - chan.send(Unit) - val server = scheduler.deploy( - workload.image.name, workload.image, - Flavor(workload.image.maxCores, workload.image.requiredMemory) - ) - // Monitor server events - server.events - .onEach { - if (it is ServerEvent.StateChanged) { - monitor.onVmStateChanged(it.server) - } - - finished.send(Unit) - } - .collect() + val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false + if (machineInCluster) { + println("Ignored VM") + continue } } - while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) { - finished.receive() + submitted++ + delay(max(0, time - simulationContext.clock.millis())) + domain.launch { + chan.send(Unit) + val server = scheduler.deploy( + workload.image.name, workload.image, + Flavor(workload.image.maxCores, workload.image.requiredMemory) + ) + // Monitor server events + server.events + .onEach { + if (it is ServerEvent.StateChanged) { + monitor.onVmStateChanged(it.server) + } + + delay(1) + finished.send(Unit) + } + .collect() } } + + while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) { + finished.receive() + } } finally { reader.close() } @@ -347,7 +347,7 @@ fun main(args: Array<String>) { } attachMonitor(scheduler, monitor) - processTrace(traceReader, scheduler, chan, vmPlacements, monitor) + processTrace(traceReader, scheduler, chan, monitor, vmPlacements) println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt new file mode 100644 index 00000000..dd0931e4 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -0,0 +1,183 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.SimulationEngine +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.io.File +import java.util.ServiceLoader + +/** + * An integration test suite for the SC20 experiments. + */ +class Sc20IntegrationTest { + /** + * The simulation engine to use. + */ + private lateinit var simulationEngine: SimulationEngine + + /** + * The root simulation domain to run in. + */ + private lateinit var root: Domain + + /** + * The monitor used to keep track of the metrics. + */ + private lateinit var monitor: TestSc20Monitor + + /** + * Setup the experimental environment. + */ + @BeforeEach + fun setUp() { + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + simulationEngine = provider("test") + root = simulationEngine.newDomain("root") + monitor = TestSc20Monitor() + } + + /** + * Tear down the experimental environment. + */ + @AfterEach + fun tearDown() = runBlocking { + simulationEngine.terminate() + } + + @Test + fun smoke() { + val failures = false + val seed = 0 + val chan = Channel<Unit>(Channel.CONFLATED) + val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val traceReader = createTestTraceReader() + val environmentReader = createTestEnvironmentReader() + lateinit var scheduler: SimpleVirtProvisioningService + + root.launch { + val res = createProvisioner(root, environmentReader, allocationPolicy) + val bareMetalProvisioner = res.first + scheduler = res.second + + val failureDomain = if (failures) { + println("ENABLING failures") + createFailureDomain(seed, bareMetalProvisioner, chan) + } else { + null + } + + attachMonitor(scheduler, monitor) + processTrace(traceReader, scheduler, chan, monitor) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + runSimulation() + + // Note that these values have been verified beforehand + assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") + assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") + assertEquals(207379117949, monitor.totalRequestedBurst) + assertEquals(207378478631, monitor.totalGrantedBurst) + assertEquals(639360, monitor.totalOvercommissionedBurst) + assertEquals(0, monitor.totalInterferedBurst) + } + + /** + * Run the simulation. + */ + private fun runSimulation() = runBlocking { + simulationEngine.run() + } + + /** + * Obtain the trace reader for the test. + */ + private fun createTestTraceReader(): TraceReader<VmWorkload> { + val performanceInterferenceStream = object {}.javaClass.getResourceAsStream("/env/performance-interference.json") + val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) + .construct() + return createTraceReader(File("src/test/resources/trace"), performanceInterferenceModel, emptyList(), 0) + } + + /** + * Obtain the environment reader for the test. + */ + private fun createTestEnvironmentReader(): EnvironmentReader { + val stream = object {}.javaClass.getResourceAsStream("/env/topology.txt") + return Sc20ClusterEnvironmentReader(stream) + } + + class TestSc20Monitor : Sc20Monitor { + var totalRequestedBurst = 0L + var totalGrantedBurst = 0L + var totalOvercommissionedBurst = 0L + var totalInterferedBurst = 0L + + override suspend fun onSliceFinish( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, + duration: Long + ) { + totalRequestedBurst += requestedBurst + totalGrantedBurst += grantedBurst + totalOvercommissionedBurst += overcommissionedBurst + totalInterferedBurst += interferedBurst + } + override fun close() {} + } +} diff --git a/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt b/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt new file mode 100644 index 00000000..6b347bff --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt @@ -0,0 +1,5 @@ +ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost +A01;A01;32;3.2;2048;1;256;32 +B01;B01;48;2.93;1256;6;64;8 +C01;C01;32;3.2;2048;2;128;16 + diff --git a/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet b/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet Binary files differnew file mode 100644 index 00000000..ce7a812c --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet diff --git a/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet b/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet Binary files differnew file mode 100644 index 00000000..1d7ce882 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet |
