summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-05 21:33:39 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-06 11:20:42 +0200
commit3ed277efba4cf96be00ba6e975d4da7fdbfaa671 (patch)
treecb0b78b259eb66fc9daf4ff9c071b17279147a54
parent4aa1ed9d20c7a87c6b5388ddf33b25769f91f20b (diff)
refactor: Modularize experiment code
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt145
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt149
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt324
3 files changed, 364 insertions, 254 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index bac0de21..4b8b80a8 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -1,66 +1,34 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
package com.atlarge.opendc.experiments.sc20
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import kotlinx.coroutines.flow.first
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
import java.io.Closeable
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
-
-class Sc20Monitor(
- destination: String
-) : Closeable {
- private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
- private val schema = SchemaBuilder
- .record("slice")
- .namespace("com.atlarge.opendc.experiments.sc20")
- .fields()
- .name("time").type().longType().noDefault()
- .name("duration").type().longType().noDefault()
- .name("requestedBurst").type().longType().noDefault()
- .name("grantedBurst").type().longType().noDefault()
- .name("overcommissionedBurst").type().longType().noDefault()
- .name("interferedBurst").type().longType().noDefault()
- .name("cpuUsage").type().doubleType().noDefault()
- .name("cpuDemand").type().doubleType().noDefault()
- .name("numberOfDeployedImages").type().intType().noDefault()
- .name("server").type().stringType().noDefault()
- .name("hostState").type().stringType().noDefault()
- .name("hostUsage").type().doubleType().noDefault()
- .name("powerDraw").type().doubleType().noDefault()
- .name("totalSubmittedVms").type().longType().noDefault()
- .name("totalQueuedVms").type().longType().noDefault()
- .name("totalRunningVms").type().longType().noDefault()
- .name("totalFinishedVms").type().longType().noDefault()
- .endRecord()
- private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
- private val queue = ArrayBlockingQueue<GenericData.Record>(2048)
- private val writerThread = thread(start = true, name = "sc20-writer") {
- try {
- while (true) {
- val record = queue.take()
- writer.write(record)
- }
- } catch (e: InterruptedException) {
- // Do not rethrow this
- } finally {
- writer.close()
- }
- }
+interface Sc20Monitor : Closeable {
suspend fun onVmStateChanged(server: Server) {}
suspend fun serverStateChanged(
@@ -70,32 +38,7 @@ class Sc20Monitor(
queuedVms: Long,
runningVms: Long,
finishedVms: Long
- ) {
- val lastServerState = lastServerStates[server]
- if (server.state == ServerState.SHUTOFF && lastServerState != null) {
- val duration = simulationContext.clock.millis() - lastServerState.second
- onSliceFinish(
- simulationContext.clock.millis(),
- 0,
- 0,
- 0,
- 0,
- 0.0,
- 0.0,
- 0,
- server,
- submittedVms,
- queuedVms,
- runningVms,
- finishedVms,
- duration
- )
- }
-
- println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}")
-
- lastServerStates[server] = Pair(server.state, simulationContext.clock.millis())
- }
+ ) {}
suspend fun onSliceFinish(
time: Long,
@@ -112,39 +55,5 @@ class Sc20Monitor(
runningVms: Long,
finishedVms: Long,
duration: Long = 5 * 60 * 1000L
- ) {
- // Assume for now that the host is not virtualized and measure the current power draw
- val driver = hostServer.services[BareMetalDriver.Key]
- val usage = driver.usage.first()
- val powerDraw = driver.powerDraw.first()
-
- val record = GenericData.Record(schema)
- record.put("time", time)
- record.put("duration", duration)
- record.put("requestedBurst", requestedBurst)
- record.put("grantedBurst", grantedBurst)
- record.put("overcommissionedBurst", overcommissionedBurst)
- record.put("interferedBurst", interferedBurst)
- record.put("cpuUsage", cpuUsage)
- record.put("cpuDemand", cpuDemand)
- record.put("numberOfDeployedImages", numberOfDeployedImages)
- record.put("server", hostServer.uid)
- record.put("hostState", hostServer.state)
- record.put("hostUsage", usage)
- record.put("powerDraw", powerDraw)
- record.put("totalSubmittedVms", submittedVms)
- record.put("totalQueuedVms", queuedVms)
- record.put("totalRunningVms", runningVms)
- record.put("totalFinishedVms", finishedVms)
-
- queue.put(record)
- }
-
- override fun close() {
- // Busy loop to wait for writer thread to finish
- while (queue.isNotEmpty()) {
- Thread.sleep(500)
- }
- writerThread.interrupt()
- }
+ ) {}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt
new file mode 100644
index 00000000..5e554196
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt
@@ -0,0 +1,149 @@
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.odcsim.simulationContext
+import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerState
+import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import kotlinx.coroutines.flow.first
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.util.concurrent.ArrayBlockingQueue
+import kotlin.concurrent.thread
+
+class Sc20ParquetMonitor(
+ destination: String
+) : Sc20Monitor {
+ private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
+ private val schema = SchemaBuilder
+ .record("slice")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ .name("time").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("requestedBurst").type().longType().noDefault()
+ .name("grantedBurst").type().longType().noDefault()
+ .name("overcommissionedBurst").type().longType().noDefault()
+ .name("interferedBurst").type().longType().noDefault()
+ .name("cpuUsage").type().doubleType().noDefault()
+ .name("cpuDemand").type().doubleType().noDefault()
+ .name("numberOfDeployedImages").type().intType().noDefault()
+ .name("server").type().stringType().noDefault()
+ .name("hostState").type().stringType().noDefault()
+ .name("hostUsage").type().doubleType().noDefault()
+ .name("powerDraw").type().doubleType().noDefault()
+ .name("totalSubmittedVms").type().longType().noDefault()
+ .name("totalQueuedVms").type().longType().noDefault()
+ .name("totalRunningVms").type().longType().noDefault()
+ .name("totalFinishedVms").type().longType().noDefault()
+ .endRecord()
+ private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+ private val queue = ArrayBlockingQueue<GenericData.Record>(2048)
+ private val writerThread = thread(start = true, name = "sc20-writer") {
+ try {
+ while (true) {
+ val record = queue.take()
+ writer.write(record)
+ }
+ } catch (e: InterruptedException) {
+ // Do not rethrow this
+ } finally {
+ writer.close()
+ }
+ }
+
+ override suspend fun onVmStateChanged(server: Server) {}
+
+ override suspend fun serverStateChanged(
+ driver: VirtDriver,
+ server: Server,
+ submittedVms: Long,
+ queuedVms: Long,
+ runningVms: Long,
+ finishedVms: Long
+ ) {
+ val lastServerState = lastServerStates[server]
+ if (server.state == ServerState.SHUTOFF && lastServerState != null) {
+ val duration = simulationContext.clock.millis() - lastServerState.second
+ onSliceFinish(
+ simulationContext.clock.millis(),
+ 0,
+ 0,
+ 0,
+ 0,
+ 0.0,
+ 0.0,
+ 0,
+ server,
+ submittedVms,
+ queuedVms,
+ runningVms,
+ finishedVms,
+ duration
+ )
+ }
+
+ println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}")
+
+ lastServerStates[server] = Pair(server.state, simulationContext.clock.millis())
+ }
+
+ override suspend fun onSliceFinish(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ numberOfDeployedImages: Int,
+ hostServer: Server,
+ submittedVms: Long,
+ queuedVms: Long,
+ runningVms: Long,
+ finishedVms: Long,
+ duration: Long
+ ) {
+ // Assume for now that the host is not virtualized and measure the current power draw
+ val driver = hostServer.services[BareMetalDriver.Key]
+ val usage = driver.usage.first()
+ val powerDraw = driver.powerDraw.first()
+
+ val record = GenericData.Record(schema)
+ record.put("time", time)
+ record.put("duration", duration)
+ record.put("requestedBurst", requestedBurst)
+ record.put("grantedBurst", grantedBurst)
+ record.put("overcommissionedBurst", overcommissionedBurst)
+ record.put("interferedBurst", interferedBurst)
+ record.put("cpuUsage", cpuUsage)
+ record.put("cpuDemand", cpuDemand)
+ record.put("numberOfDeployedImages", numberOfDeployedImages)
+ record.put("server", hostServer.uid)
+ record.put("hostState", hostServer.state)
+ record.put("hostUsage", usage)
+ record.put("powerDraw", powerDraw)
+ record.put("totalSubmittedVms", submittedVms)
+ record.put("totalQueuedVms", queuedVms)
+ record.put("totalRunningVms", runningVms)
+ record.put("totalFinishedVms", finishedVms)
+
+ queue.put(record)
+ }
+
+ override fun close() {
+ // Busy loop to wait for writer thread to finish
+ while (queue.isNotEmpty()) {
+ Thread.sleep(500)
+ }
+ writerThread.interrupt()
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index a2f609a5..6f1e9aae 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -29,11 +29,14 @@ import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ServerEvent
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.compute.metal.NODE_CLUSTER
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
+import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
@@ -43,7 +46,9 @@ import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy
import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.failure.FaultInjector
+import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import com.atlarge.opendc.format.trace.TraceReader
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
@@ -53,15 +58,19 @@ import com.xenomachina.argparser.default
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.withContext
import java.io.File
import java.io.FileReader
+import java.lang.IllegalArgumentException
import java.util.ServiceLoader
+import java.util.TreeSet
import kotlin.math.max
import kotlin.random.Random
@@ -102,6 +111,25 @@ class ExperimentParameters(parser: ArgParser) {
}
/**
+ * Construct the failure domain for the experiments.
+ */
+suspend fun createFailureDomain(seed: Int, bareMetalProvisioner: ProvisioningService, chan: Channel<Unit>): Domain {
+ val root = simulationContext.domain
+ val domain = root.newDomain(name = "failures")
+ domain.launch {
+ chan.receive()
+ val random = Random(seed)
+ val injectors = mutableMapOf<String, FaultInjector>()
+ for (node in bareMetalProvisioner.nodes()) {
+ val cluster = node.metadata[NODE_CLUSTER] as String
+ val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) }
+ injector.enqueue(node.metadata["driver"] as FailureDomain)
+ }
+ }
+ return domain
+}
+
+/**
* Obtain the [FaultInjector] to use for the experiments.
*/
fun createFaultInjector(domain: Domain, random: Random): FaultInjector {
@@ -116,141 +144,89 @@ fun createFaultInjector(domain: Domain, random: Random): FaultInjector {
}
/**
- * Main entry point of the experiment.
+ * Create the trace reader from which the VM workloads are read.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
-fun main(args: Array<String>) {
- ArgParser(args).parseInto(::ExperimentParameters).run {
- println("trace-directory: $traceDirectory")
- println("environment-file: $environmentFile")
- println("performance-interference-file: $performanceInterferenceFile")
- println("vm-placement-file: $vmPlacementFile")
- println("selected-vms-file: $selectedVmsFile")
- println("seed: $seed")
- println("failures: $failures")
- println("allocation-policy: $allocationPolicy")
-
- val start = System.currentTimeMillis()
- val monitor = Sc20Monitor(outputFile)
-
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider("test")
- val root = system.newDomain("root")
- val chan = Channel<Unit>(Channel.CONFLATED)
-
- val vmPlacements = if (vmPlacementFile == null) {
- emptyMap()
- } else {
- Sc20VmPlacementReader(File(vmPlacementFile!!).inputStream().buffered()).construct()
- }
-
- val allocationPolicies = mapOf(
- "mem" to AvailableMemoryAllocationPolicy(),
- "mem-inv" to AvailableMemoryAllocationPolicy(true),
- "core-mem" to AvailableCoreMemoryAllocationPolicy(),
- "core-mem-inv" to AvailableCoreMemoryAllocationPolicy(true),
- "active-servers" to NumberOfActiveServersAllocationPolicy(),
- "active-servers-inv" to NumberOfActiveServersAllocationPolicy(true),
- "provisioned-cores" to ProvisionedCoresAllocationPolicy(),
- "provisioned-cores-inv" to ProvisionedCoresAllocationPolicy(true),
- "replay" to ReplayAllocationPolicy(vmPlacements),
- "random" to RandomAllocationPolicy(Random(seed))
- )
-
- if (allocationPolicy !in allocationPolicies) {
- println("error: unknown allocation policy $allocationPolicy")
- println("Available:")
- allocationPolicies.keys.forEach { key -> println(key) }
- }
-
- root.launch {
- val environment = Sc20ClusterEnvironmentReader(File(environmentFile))
- .use { it.construct(root) }
-
- val performanceInterferenceStream = if (performanceInterferenceFile != null) {
- File(performanceInterferenceFile!!).inputStream().buffered()
- } else {
- object {}.javaClass.getResourceAsStream("/env/performance-interference.json")
- }
-
- val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream)
- .construct()
-
- println(simulationContext.clock.instant())
+fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader {
+ return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed))
+}
- val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key]
+/**
+ * Construct the environment for a VM provisioner and return the provisioner instance.
+ */
+suspend fun createProvisioner(
+ root: Domain,
+ environmentReader: EnvironmentReader,
+ allocationPolicy: AllocationPolicy
+): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) {
+ val environment = environmentReader.use { it.construct(root) }
+ val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key]
- // Wait for the bare metal nodes to be spawned
- delay(10)
+ // Wait for the bare metal nodes to be spawned
+ delay(10)
- val scheduler = SimpleVirtProvisioningService(
- allocationPolicies.getValue(allocationPolicy),
- simulationContext,
- bareMetalProvisioner
- )
+ val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner)
- // Wait for the hypervisors to be spawned
- delay(10)
+ // Wait for the hypervisors to be spawned
+ delay(10)
- val hypervisors = scheduler.drivers()
+ bareMetalProvisioner to scheduler
+}
- // Monitor hypervisor events
- for (hypervisor in hypervisors) {
- // TODO Do not expose VirtDriver directly but use Hypervisor class.
- monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
- hypervisor.server.events
- .onEach { event ->
- when (event) {
- is ServerEvent.StateChanged -> {
- monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
- }
- }
- }
- .launchIn(this)
- hypervisor.events
- .onEach { event ->
- when (event) {
- is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(
- simulationContext.clock.millis(),
- event.requestedBurst,
- event.grantedBurst,
- event.overcommissionedBurst,
- event.interferedBurst,
- event.cpuUsage,
- event.cpuDemand,
- event.numberOfDeployedImages,
- event.hostServer,
- scheduler.submittedVms,
- scheduler.queuedVms,
- scheduler.runningVms,
- scheduler.finishedVms
- )
- }
+/**
+ * Attach the specified monitor to the VM provisioner.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc20Monitor) {
+ val domain = simulationContext.domain
+ val hypervisors = scheduler.drivers()
+
+ // Monitor hypervisor events
+ for (hypervisor in hypervisors) {
+ // TODO Do not expose VirtDriver directly but use Hypervisor class.
+ monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
+ hypervisor.server.events
+ .onEach { event ->
+ when (event) {
+ is ServerEvent.StateChanged -> {
+ monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
}
- .launchIn(this)
+ }
}
-
- val failureDomain = if (failures) {
- println("ENABLING failures")
- val domain = root.newDomain(name = "failures")
- domain.launch {
- chan.receive()
- val random = Random(seed)
- val injectors = mutableMapOf<String, FaultInjector>()
- for (node in bareMetalProvisioner.nodes()) {
- val cluster = node.metadata[NODE_CLUSTER] as String
- val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) }
- injector.enqueue(node.metadata["driver"] as FailureDomain)
- }
+ .launchIn(domain)
+ hypervisor.events
+ .onEach { event ->
+ when (event) {
+ is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(
+ simulationContext.clock.millis(),
+ event.requestedBurst,
+ event.grantedBurst,
+ event.overcommissionedBurst,
+ event.interferedBurst,
+ event.cpuUsage,
+ event.cpuDemand,
+ event.numberOfDeployedImages,
+ event.hostServer,
+ scheduler.submittedVms,
+ scheduler.queuedVms,
+ scheduler.runningVms,
+ scheduler.finishedVms
+ )
}
- domain
- } else {
- null
}
+ .launchIn(domain)
+ }
+}
+/**
+ * Process the trace.
+ */
+suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, vmPlacements: Map<String, String>, monitor: Sc20Monitor) {
+ try {
+ coroutineScope {
var submitted = 0L
- val finished = Channel<Unit>(Channel.RENDEZVOUS)
- val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed))
+ val finished = Channel<Unit>(Channel.CONFLATED)
+ val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name })
+
while (reader.hasNext()) {
val (time, workload) = reader.next()
@@ -262,9 +238,8 @@ fun main(args: Array<String>) {
println("Could not find placement data in VM placement file for VM $vmId")
continue
}
- val machinesInCluster =
- hypervisors.filter { (it as SimpleVirtDriver).server.name.contains(clusterName) }
- if (machinesInCluster.isEmpty()) {
+ val machineInCluster = hypervisors.ceiling(clusterName)?.let { it.contains(clusterName) } ?: false
+ if (machineInCluster) {
println("Ignored VM")
continue
}
@@ -294,21 +269,98 @@ fun main(args: Array<String>) {
while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) {
finished.receive()
}
+ }
+ } finally {
+ reader.close()
+ }
+}
- println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
-
- failureDomain?.cancel()
- scheduler.terminate()
- reader.close()
- println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds")
+/**
+ * Main entry point of the experiment.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+fun main(args: Array<String>) {
+ val cli = ArgParser(args).parseInto(::ExperimentParameters)
+ println("trace-directory: ${cli.traceDirectory}")
+ println("environment-file: ${cli.environmentFile}")
+ println("performance-interference-file: ${cli.performanceInterferenceFile}")
+ println("selected-vms-file: ${cli.selectedVmsFile}")
+ println("seed: ${cli.seed}")
+ println("failures: ${cli.failures}")
+ println("allocation-policy: ${cli.allocationPolicy}")
+
+ val start = System.currentTimeMillis()
+ val monitor: Sc20Monitor = Sc20ParquetMonitor(cli.outputFile)
+
+ val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
+ val system = provider("test")
+ val root = system.newDomain("root")
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+
+ val performanceInterferenceModel = try {
+ val performanceInterferenceStream = if (cli.performanceInterferenceFile != null) {
+ File(cli.performanceInterferenceFile!!).inputStream().buffered()
+ } else {
+ object {}.javaClass.getResourceAsStream("/env/performance-interference.json")
}
+ Sc20PerformanceInterferenceReader(performanceInterferenceStream)
+ .construct()
+ } catch (e: Throwable) {
+ monitor.close()
+ throw e
+ }
+ val vmPlacements = if (cli.vmPlacementFile == null) {
+ emptyMap()
+ } else {
+ Sc20VmPlacementReader(File(cli.vmPlacementFile!!).inputStream().buffered()).construct()
+ }
+ val environmentReader = Sc20ClusterEnvironmentReader(File(cli.environmentFile))
+ val traceReader = try {
+ createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed)
+ } catch (e: Throwable) {
+ monitor.close()
+ throw e
+ }
+ val allocationPolicy = when (cli.allocationPolicy) {
+ "mem" -> AvailableMemoryAllocationPolicy()
+ "mem-inv" -> AvailableMemoryAllocationPolicy(true)
+ "core-mem" -> AvailableCoreMemoryAllocationPolicy()
+ "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
+ "active-servers" -> NumberOfActiveServersAllocationPolicy()
+ "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
+ "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
+ "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
+ "random" -> RandomAllocationPolicy(Random(cli.seed))
+ "replay" -> ReplayAllocationPolicy(vmPlacements)
+ else -> throw IllegalArgumentException("Unknown allocation policy: ${cli.allocationPolicy}")
+ }
+
+ root.launch {
+ val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy)
- runBlocking {
- system.run()
- system.terminate()
+ val failureDomain = if (cli.failures) {
+ println("ENABLING failures")
+ createFailureDomain(cli.seed, bareMetalProvisioner, chan)
+ } else {
+ null
}
- // Explicitly close the monitor to flush its buffer
- monitor.close()
+ attachMonitor(scheduler, monitor)
+ processTrace(traceReader, scheduler, chan, vmPlacements, monitor)
+
+ println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+
+ failureDomain?.cancel()
+ scheduler.terminate()
+ println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds")
}
+
+ runBlocking {
+ system.run()
+ system.terminate()
+ }
+
+ // Explicitly close the monitor to flush its buffer
+ monitor.close()
}