summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc/opendc-experiments-sc20/build.gradle.kts2
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt)90
-rw-r--r--opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt183
-rw-r--r--opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt5
-rw-r--r--opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquetbin0 -> 2148 bytes
-rw-r--r--opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquetbin0 -> 1672463 bytes
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt11
7 files changed, 241 insertions, 50 deletions
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts
index 8611ffa7..ccfa3038 100644
--- a/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -31,7 +31,7 @@ plugins {
}
application {
- mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperimentKt"
+ mainClassName = "com.atlarge.opendc.experiments.sc20.Sc20ExperimentKt"
applicationDefaultJvmArgs = listOf("-Xmx2500M", "-Xms1800M")
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
index 6f1e9aae..fc4b9058 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
@@ -58,7 +58,6 @@ import com.xenomachina.argparser.default
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
@@ -220,56 +219,57 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc2
/**
* Process the trace.
*/
-suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, vmPlacements: Map<String, String>, monitor: Sc20Monitor) {
+suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: Sc20Monitor, vmPlacements: Map<String, String> = emptyMap()) {
+ val domain = simulationContext.domain
+
try {
- coroutineScope {
- var submitted = 0L
- val finished = Channel<Unit>(Channel.CONFLATED)
- val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name })
-
- while (reader.hasNext()) {
- val (time, workload) = reader.next()
-
- if (vmPlacements.isNotEmpty()) {
- val vmId = workload.name.replace("VM Workload ", "")
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null) {
- println("Could not find placement data in VM placement file for VM $vmId")
- continue
- }
- val machineInCluster = hypervisors.ceiling(clusterName)?.let { it.contains(clusterName) } ?: false
- if (machineInCluster) {
- println("Ignored VM")
- continue
- }
+ var submitted = 0L
+ val finished = Channel<Unit>(Channel.CONFLATED)
+ val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name })
+
+ while (reader.hasNext()) {
+ val (time, workload) = reader.next()
+
+ if (vmPlacements.isNotEmpty()) {
+ val vmId = workload.name.replace("VM Workload ", "")
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null) {
+ println("Could not find placement data in VM placement file for VM $vmId")
+ continue
}
-
- submitted++
- delay(max(0, time - simulationContext.clock.millis()))
- launch {
- chan.send(Unit)
- val server = scheduler.deploy(
- workload.image.name, workload.image,
- Flavor(workload.image.maxCores, workload.image.requiredMemory)
- )
- // Monitor server events
- server.events
- .onEach {
- if (it is ServerEvent.StateChanged) {
- monitor.onVmStateChanged(it.server)
- }
-
- finished.send(Unit)
- }
- .collect()
+ val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false
+ if (machineInCluster) {
+ println("Ignored VM")
+ continue
}
}
- while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) {
- finished.receive()
+ submitted++
+ delay(max(0, time - simulationContext.clock.millis()))
+ domain.launch {
+ chan.send(Unit)
+ val server = scheduler.deploy(
+ workload.image.name, workload.image,
+ Flavor(workload.image.maxCores, workload.image.requiredMemory)
+ )
+ // Monitor server events
+ server.events
+ .onEach {
+ if (it is ServerEvent.StateChanged) {
+ monitor.onVmStateChanged(it.server)
+ }
+
+ delay(1)
+ finished.send(Unit)
+ }
+ .collect()
}
}
+
+ while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) {
+ finished.receive()
+ }
} finally {
reader.close()
}
@@ -347,7 +347,7 @@ fun main(args: Array<String>) {
}
attachMonitor(scheduler, monitor)
- processTrace(traceReader, scheduler, chan, vmPlacements, monitor)
+ processTrace(traceReader, scheduler, chan, monitor, vmPlacements)
println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
new file mode 100644
index 00000000..dd0931e4
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -0,0 +1,183 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.SimulationEngine
+import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
+import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
+import com.atlarge.opendc.format.environment.EnvironmentReader
+import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import com.atlarge.opendc.format.trace.TraceReader
+import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import java.io.File
+import java.util.ServiceLoader
+
+/**
+ * An integration test suite for the SC20 experiments.
+ */
+class Sc20IntegrationTest {
+ /**
+ * The simulation engine to use.
+ */
+ private lateinit var simulationEngine: SimulationEngine
+
+ /**
+ * The root simulation domain to run in.
+ */
+ private lateinit var root: Domain
+
+ /**
+ * The monitor used to keep track of the metrics.
+ */
+ private lateinit var monitor: TestSc20Monitor
+
+ /**
+ * Setup the experimental environment.
+ */
+ @BeforeEach
+ fun setUp() {
+ val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
+ simulationEngine = provider("test")
+ root = simulationEngine.newDomain("root")
+ monitor = TestSc20Monitor()
+ }
+
+ /**
+ * Tear down the experimental environment.
+ */
+ @AfterEach
+ fun tearDown() = runBlocking {
+ simulationEngine.terminate()
+ }
+
+ @Test
+ fun smoke() {
+ val failures = false
+ val seed = 0
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
+ val traceReader = createTestTraceReader()
+ val environmentReader = createTestEnvironmentReader()
+ lateinit var scheduler: SimpleVirtProvisioningService
+
+ root.launch {
+ val res = createProvisioner(root, environmentReader, allocationPolicy)
+ val bareMetalProvisioner = res.first
+ scheduler = res.second
+
+ val failureDomain = if (failures) {
+ println("ENABLING failures")
+ createFailureDomain(seed, bareMetalProvisioner, chan)
+ } else {
+ null
+ }
+
+ attachMonitor(scheduler, monitor)
+ processTrace(traceReader, scheduler, chan, monitor)
+
+ println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+
+ failureDomain?.cancel()
+ scheduler.terminate()
+ }
+
+ runSimulation()
+
+ // Note that these values have been verified beforehand
+ assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs")
+ assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run")
+ assertEquals(207379117949, monitor.totalRequestedBurst)
+ assertEquals(207378478631, monitor.totalGrantedBurst)
+ assertEquals(639360, monitor.totalOvercommissionedBurst)
+ assertEquals(0, monitor.totalInterferedBurst)
+ }
+
+ /**
+ * Run the simulation.
+ */
+ private fun runSimulation() = runBlocking {
+ simulationEngine.run()
+ }
+
+ /**
+ * Obtain the trace reader for the test.
+ */
+ private fun createTestTraceReader(): TraceReader<VmWorkload> {
+ val performanceInterferenceStream = object {}.javaClass.getResourceAsStream("/env/performance-interference.json")
+ val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream)
+ .construct()
+ return createTraceReader(File("src/test/resources/trace"), performanceInterferenceModel, emptyList(), 0)
+ }
+
+ /**
+ * Obtain the environment reader for the test.
+ */
+ private fun createTestEnvironmentReader(): EnvironmentReader {
+ val stream = object {}.javaClass.getResourceAsStream("/env/topology.txt")
+ return Sc20ClusterEnvironmentReader(stream)
+ }
+
+ class TestSc20Monitor : Sc20Monitor {
+ var totalRequestedBurst = 0L
+ var totalGrantedBurst = 0L
+ var totalOvercommissionedBurst = 0L
+ var totalInterferedBurst = 0L
+
+ override suspend fun onSliceFinish(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ numberOfDeployedImages: Int,
+ hostServer: Server,
+ submittedVms: Long,
+ queuedVms: Long,
+ runningVms: Long,
+ finishedVms: Long,
+ duration: Long
+ ) {
+ totalRequestedBurst += requestedBurst
+ totalGrantedBurst += grantedBurst
+ totalOvercommissionedBurst += overcommissionedBurst
+ totalInterferedBurst += interferedBurst
+ }
+ override fun close() {}
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt b/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt
new file mode 100644
index 00000000..6b347bff
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt
@@ -0,0 +1,5 @@
+ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
+A01;A01;32;3.2;2048;1;256;32
+B01;B01;48;2.93;1256;6;64;8
+C01;C01;32;3.2;2048;2;128;16
+
diff --git a/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet b/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet
new file mode 100644
index 00000000..ce7a812c
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet
Binary files differ
diff --git a/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet b/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet
new file mode 100644
index 00000000..1d7ce882
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet
Binary files differ
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index 2ef0db97..e34ee2dc 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -38,9 +38,9 @@ import com.atlarge.opendc.core.Platform
import com.atlarge.opendc.core.Zone
import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.format.environment.EnvironmentReader
-import java.io.BufferedReader
import java.io.File
-import java.io.FileReader
+import java.io.FileInputStream
+import java.io.InputStream
import java.util.Random
import java.util.UUID
@@ -50,8 +50,11 @@ import java.util.UUID
* @param environmentFile The file describing the physical cluster.
*/
class Sc20ClusterEnvironmentReader(
- private val environmentFile: File
+ private val input: InputStream
) : EnvironmentReader {
+
+ constructor(file: File) : this(FileInputStream(file))
+
@Suppress("BlockingMethodInNonBlockingContext")
override suspend fun construct(dom: Domain): Environment {
var clusterIdCol = 0
@@ -70,7 +73,7 @@ class Sc20ClusterEnvironmentReader(
val nodes = mutableListOf<SimpleBareMetalDriver>()
val random = Random(0)
- BufferedReader(FileReader(environmentFile)).use { reader ->
+ input.bufferedReader().use { reader ->
reader.lineSequence()
.filter { line ->
// Ignore comments in the file