summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-experiments-sc20/build.gradle.kts3
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt235
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt442
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt7
4 files changed, 380 insertions, 307 deletions
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts
index 76ec7cc4..42ccb1b1 100644
--- a/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -39,8 +39,7 @@ dependencies {
api(project(":opendc:opendc-core"))
implementation(project(":opendc:opendc-format"))
implementation(kotlin("stdlib"))
- implementation("com.xenomachina:kotlin-argparser:2.0.7")
- implementation("org.slf4j:slf4j-api:${Library.SLF4J}")
+ implementation("com.github.ajalt:clikt:2.6.0")
implementation("io.github.microutils:kotlin-logging:1.7.9")
implementation("org.apache.parquet:parquet-avro:1.11.0")
implementation("org.apache.hadoop:hadoop-client:3.2.1") {
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
new file mode 100644
index 00000000..e37dea8b
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
@@ -0,0 +1,235 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
+import com.atlarge.opendc.compute.core.Flavor
+import com.atlarge.opendc.compute.core.ServerEvent
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.compute.metal.NODE_CLUSTER
+import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.compute.virt.HypervisorEvent
+import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
+import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
+import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
+import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
+import com.atlarge.opendc.core.failure.FailureDomain
+import com.atlarge.opendc.core.failure.FaultInjector
+import com.atlarge.opendc.format.environment.EnvironmentReader
+import com.atlarge.opendc.format.trace.TraceReader
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
+import mu.KotlinLogging
+import java.io.File
+import java.util.TreeSet
+import kotlin.math.ln
+import kotlin.math.max
+import kotlin.random.Random
+
+/**
+ * The logger for this experiment.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * Construct the failure domain for the experiments.
+ */
+suspend fun createFailureDomain(
+ seed: Int,
+ failureInterval: Int,
+ bareMetalProvisioner: ProvisioningService,
+ chan: Channel<Unit>
+): Domain {
+ val root = simulationContext.domain
+ val domain = root.newDomain(name = "failures")
+ domain.launch {
+ chan.receive()
+ val random = Random(seed)
+ val injectors = mutableMapOf<String, FaultInjector>()
+ for (node in bareMetalProvisioner.nodes()) {
+ val cluster = node.metadata[NODE_CLUSTER] as String
+ val injector =
+ injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) }
+ injector.enqueue(node.metadata["driver"] as FailureDomain)
+ }
+ }
+ return domain
+}
+
+/**
+ * Obtain the [FaultInjector] to use for the experiments.
+ */
+fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): FaultInjector {
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return CorrelatedFaultInjector(
+ domain,
+ iatScale = ln(failureInterval.toDouble()), iatShape = 1.03, // Hours
+ sizeScale = 1.88, sizeShape = 1.25,
+ dScale = 9.51, dShape = 3.21, // Minutes
+ random = random
+ )
+}
+
+/**
+ * Create the trace reader from which the VM workloads are read.
+ */
+fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader {
+ return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed))
+}
+
+/**
+ * Construct the environment for a VM provisioner and return the provisioner instance.
+ */
+suspend fun createProvisioner(
+ root: Domain,
+ environmentReader: EnvironmentReader,
+ allocationPolicy: AllocationPolicy
+): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) {
+ val environment = environmentReader.use { it.construct(root) }
+ val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
+
+ // Wait for the bare metal nodes to be spawned
+ delay(10)
+
+ val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner)
+
+ // Wait for the hypervisors to be spawned
+ delay(10)
+
+ bareMetalProvisioner to scheduler
+}
+
+/**
+ * Attach the specified monitor to the VM provisioner.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) {
+ val domain = simulationContext.domain
+ val hypervisors = scheduler.drivers()
+
+ // Monitor hypervisor events
+ for (hypervisor in hypervisors) {
+ // TODO Do not expose VirtDriver directly but use Hypervisor class.
+ reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
+ hypervisor.server.events
+ .onEach { event ->
+ when (event) {
+ is ServerEvent.StateChanged -> {
+ reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
+ }
+ }
+ }
+ .launchIn(domain)
+ hypervisor.events
+ .onEach { event ->
+ when (event) {
+ is HypervisorEvent.SliceFinished -> reporter.reportHostSlice(
+ simulationContext.clock.millis(),
+ event.requestedBurst,
+ event.grantedBurst,
+ event.overcommissionedBurst,
+ event.interferedBurst,
+ event.cpuUsage,
+ event.cpuDemand,
+ event.numberOfDeployedImages,
+ event.hostServer,
+ scheduler.submittedVms,
+ scheduler.queuedVms,
+ scheduler.runningVms,
+ scheduler.finishedVms
+ )
+ }
+ }
+ .launchIn(domain)
+ }
+}
+
+/**
+ * Process the trace.
+ */
+suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, reporter: Sc20Reporter, vmPlacements: Map<String, String> = emptyMap()) {
+ val domain = simulationContext.domain
+
+ try {
+ var submitted = 0L
+ val finished = Channel<Unit>(Channel.CONFLATED)
+ val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name })
+
+ while (reader.hasNext()) {
+ val (time, workload) = reader.next()
+
+ if (vmPlacements.isNotEmpty()) {
+ val vmId = workload.name.replace("VM Workload ", "")
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null) {
+ logger.warn { "Could not find placement data in VM placement file for VM $vmId" }
+ continue
+ }
+ val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false
+ if (machineInCluster) {
+ logger.info { "Ignored VM $vmId" }
+ continue
+ }
+ }
+
+ submitted++
+ delay(max(0, time - simulationContext.clock.millis()))
+ domain.launch {
+ chan.send(Unit)
+ val server = scheduler.deploy(
+ workload.image.name, workload.image,
+ Flavor(workload.image.maxCores, workload.image.requiredMemory)
+ )
+ // Monitor server events
+ server.events
+ .onEach {
+ if (it is ServerEvent.StateChanged) {
+ reporter.reportVmStateChange(it.server)
+ }
+
+ delay(1)
+ finished.send(Unit)
+ }
+ .collect()
+ }
+ }
+
+ while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) {
+ finished.receive()
+ }
+ } finally {
+ reader.close()
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
index 2029d3e7..4264ad3f 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
@@ -24,85 +24,98 @@
package com.atlarge.opendc.experiments.sc20
-import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
-import com.atlarge.opendc.compute.core.Flavor
-import com.atlarge.opendc.compute.core.ServerEvent
-import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
-import com.atlarge.opendc.compute.core.workload.VmWorkload
-import com.atlarge.opendc.compute.metal.NODE_CLUSTER
-import com.atlarge.opendc.compute.metal.service.ProvisioningService
-import com.atlarge.opendc.compute.virt.HypervisorEvent
-import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
-import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
-import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy
-import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
-import com.atlarge.opendc.core.failure.FailureDomain
-import com.atlarge.opendc.core.failure.FaultInjector
-import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import com.atlarge.opendc.format.trace.TraceReader
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import com.xenomachina.argparser.ArgParser
-import com.xenomachina.argparser.default
-import kotlinx.coroutines.ExperimentalCoroutinesApi
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.groups.OptionGroup
+import com.github.ajalt.clikt.parameters.groups.default
+import com.github.ajalt.clikt.parameters.groups.groupChoice
+import com.github.ajalt.clikt.parameters.groups.mutuallyExclusiveOptions
+import com.github.ajalt.clikt.parameters.groups.required
+import com.github.ajalt.clikt.parameters.options.convert
+import com.github.ajalt.clikt.parameters.options.default
+import com.github.ajalt.clikt.parameters.options.defaultLazy
+import com.github.ajalt.clikt.parameters.options.flag
+import com.github.ajalt.clikt.parameters.options.option
+import com.github.ajalt.clikt.parameters.options.required
+import com.github.ajalt.clikt.parameters.types.choice
+import com.github.ajalt.clikt.parameters.types.file
+import com.github.ajalt.clikt.parameters.types.int
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.collect
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
-import kotlinx.coroutines.withContext
import mu.KotlinLogging
import java.io.File
import java.io.FileReader
+import java.io.InputStream
import java.util.ServiceLoader
-import java.util.TreeSet
-import kotlin.math.ln
-import kotlin.math.max
import kotlin.random.Random
+/**
+ * The logger for this experiment.
+ */
private val logger = KotlinLogging.logger {}
-class ExperimentParameters(parser: ArgParser) {
- val traceDirectory by parser.storing("path to the trace directory")
- val environmentFile by parser.storing("path to the environment file")
- val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null }
- val vmPlacementFile by parser.storing("path to the VM placement file").default { null }
- val outputFile by parser.storing("path to where the output should be stored")
- .default { "data/results-${System.currentTimeMillis()}.parquet" }
- val selectedVms by parser.storing("the VMs to run") { parseVMs(this) }
- .default { emptyList() }
- val selectedVmsFile by parser.storing("path to a file containing the VMs to run") {
- parseVMs(FileReader(File(this)).readText())
- }
- .default { emptyList() }
- val seed by parser.storing("the random seed") { toInt() }
+/**
+ * Represents the command for running the experiment.
+ */
+class ExperimentCommand : CliktCommand(name = "sc20-experiment") {
+ private val environment by option("--environment-file", help = "path to the environment file")
+ .file()
+ .required()
+ private val performanceInterferenceStream by option("--performance-interference-file", help = "path to the performance interference file")
+ .file()
+ .convert { it.inputStream() as InputStream }
+ .defaultLazy { ExperimentCommand::class.java.getResourceAsStream("/env/performance-interference.json") }
+
+ private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file")
+ .file()
+ .convert {
+ Sc20VmPlacementReader(it.inputStream().buffered()).construct()
+ }
+ .default(emptyMap())
+
+ private val selectedVms by mutuallyExclusiveOptions(
+ option("--selected-vms", help = "the VMs to run").convert { parseVMs(it) },
+ option("--selected-vms-file").file().convert { parseVMs(FileReader(it).readText()) }
+ ).default(emptyList())
+
+ private val seed by option(help = "the random seed")
+ .int()
.default(0)
- val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures")
- val failureInterval by parser.storing("expected number of hours between failures") { toInt() }
+ private val failures by option("-x", "--failures", help = "enable (correlated) machine failures")
+ .flag()
+ private val failureInterval by option(help = "expected number of hours between failures")
+ .int()
.default(24 * 7) // one week
- val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem")
-
- fun getSelectedVmList(): List<String> {
- return if (selectedVms.isEmpty()) {
- selectedVmsFile
- } else {
- selectedVms
- }
- }
+ private val allocationPolicy by option(help = "name of VM allocation policy to use")
+ .choice(
+ "mem", "mem-inv",
+ "core-mem", "core-mem-inv",
+ "active-servers", "active-servers-inv",
+ "provisioned-cores", "provisioned-cores-inv",
+ "random", "replay"
+ )
+ .default("core-mem")
+
+ private val trace by option("--trace-directory", help = "path to the trace directory")
+ .file(canBeFile = false)
+ .required()
+
+ private val reporter by option().groupChoice(
+ "parquet" to Parquet()
+ ).required()
private fun parseVMs(string: String): List<String> {
// Handle case where VM list contains a VM name with an (escaped) single-quote in it
@@ -112,271 +125,98 @@ class ExperimentParameters(parser: ArgParser) {
val vms: List<String> = jacksonObjectMapper().readValue(sanitizedString)
return vms
}
-}
-/**
- * Construct the failure domain for the experiments.
- */
-suspend fun createFailureDomain(
- seed: Int,
- failureInterval: Int,
- bareMetalProvisioner: ProvisioningService,
- chan: Channel<Unit>
-): Domain {
- val root = simulationContext.domain
- val domain = root.newDomain(name = "failures")
- domain.launch {
- chan.receive()
- val random = Random(seed)
- val injectors = mutableMapOf<String, FaultInjector>()
- for (node in bareMetalProvisioner.nodes()) {
- val cluster = node.metadata[NODE_CLUSTER] as String
- val injector =
- injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) }
- injector.enqueue(node.metadata["driver"] as FailureDomain)
+ override fun run() {
+ logger.info("seed: $seed")
+ logger.info("failures: $failures")
+ logger.info("allocation-policy: $allocationPolicy")
+
+ val start = System.currentTimeMillis()
+ val reporter: Sc20Reporter = reporter.createReporter()
+
+ val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
+ val system = provider("test")
+ val root = system.newDomain("root")
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = when (this.allocationPolicy) {
+ "mem" -> AvailableMemoryAllocationPolicy()
+ "mem-inv" -> AvailableMemoryAllocationPolicy(true)
+ "core-mem" -> AvailableCoreMemoryAllocationPolicy()
+ "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
+ "active-servers" -> NumberOfActiveServersAllocationPolicy()
+ "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
+ "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
+ "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
+ "random" -> RandomAllocationPolicy(Random(seed))
+ "replay" -> ReplayAllocationPolicy(vmPlacements)
+ else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}")
}
- }
- return domain
-}
-/**
- * Obtain the [FaultInjector] to use for the experiments.
- */
-fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): FaultInjector {
- // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- // GRID'5000
- return CorrelatedFaultInjector(
- domain,
- iatScale = ln(failureInterval.toDouble()), iatShape = 1.03, // Hours
- sizeScale = 1.88, sizeShape = 1.25,
- dScale = 9.51, dShape = 3.21, // Minutes
- random = random
- )
-}
-
-/**
- * Create the trace reader from which the VM workloads are read.
- */
-fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader {
- return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed))
-}
+ val performanceInterferenceModel = try {
+ Sc20PerformanceInterferenceReader(performanceInterferenceStream).construct()
+ } catch (e: Throwable) {
+ reporter.close()
+ throw e
+ }
+ val environmentReader = Sc20ClusterEnvironmentReader(environment)
+ val traceReader = try {
+ createTraceReader(trace, performanceInterferenceModel, selectedVms, seed)
+ } catch (e: Throwable) {
+ reporter.close()
+ throw e
+ }
-/**
- * Construct the environment for a VM provisioner and return the provisioner instance.
- */
-suspend fun createProvisioner(
- root: Domain,
- environmentReader: EnvironmentReader,
- allocationPolicy: AllocationPolicy
-): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) {
- val environment = environmentReader.use { it.construct(root) }
- val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key]
+ root.launch {
+ val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy)
- // Wait for the bare metal nodes to be spawned
- delay(10)
+ val failureDomain = if (failures) {
+ logger.info("ENABLING failures")
+ createFailureDomain(seed, failureInterval, bareMetalProvisioner, chan)
+ } else {
+ null
+ }
- val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner)
+ attachMonitor(scheduler, reporter)
+ processTrace(traceReader, scheduler, chan, reporter, vmPlacements)
- // Wait for the hypervisors to be spawned
- delay(10)
+ logger.debug("SUBMIT=${scheduler.submittedVms}")
+ logger.debug("FAIL=${scheduler.unscheduledVms}")
+ logger.debug("QUEUED=${scheduler.queuedVms}")
+ logger.debug("RUNNING=${scheduler.runningVms}")
+ logger.debug("FINISHED=${scheduler.finishedVms}")
- bareMetalProvisioner to scheduler
-}
+ failureDomain?.cancel()
+ scheduler.terminate()
+ logger.info("Simulation took ${System.currentTimeMillis() - start} milliseconds")
+ }
-/**
- * Attach the specified monitor to the VM provisioner.
- */
-@OptIn(ExperimentalCoroutinesApi::class)
-suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) {
- val domain = simulationContext.domain
- val hypervisors = scheduler.drivers()
+ runBlocking {
+ system.run()
+ system.terminate()
+ }
- // Monitor hypervisor events
- for (hypervisor in hypervisors) {
- // TODO Do not expose VirtDriver directly but use Hypervisor class.
- reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
- hypervisor.server.events
- .onEach { event ->
- when (event) {
- is ServerEvent.StateChanged -> {
- reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
- }
- }
- }
- .launchIn(domain)
- hypervisor.events
- .onEach { event ->
- when (event) {
- is HypervisorEvent.SliceFinished -> reporter.reportHostSlice(
- simulationContext.clock.millis(),
- event.requestedBurst,
- event.grantedBurst,
- event.overcommissionedBurst,
- event.interferedBurst,
- event.cpuUsage,
- event.cpuDemand,
- event.numberOfDeployedImages,
- event.hostServer,
- scheduler.submittedVms,
- scheduler.queuedVms,
- scheduler.runningVms,
- scheduler.finishedVms
- )
- }
- }
- .launchIn(domain)
+ // Explicitly close the monitor to flush its buffer
+ reporter.close()
}
}
-/**
- * Process the trace.
- */
-suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, reporter: Sc20Reporter, vmPlacements: Map<String, String> = emptyMap()) {
- val domain = simulationContext.domain
-
- try {
- var submitted = 0L
- val finished = Channel<Unit>(Channel.CONFLATED)
- val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name })
-
- while (reader.hasNext()) {
- val (time, workload) = reader.next()
-
- if (vmPlacements.isNotEmpty()) {
- val vmId = workload.name.replace("VM Workload ", "")
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null) {
- logger.warn { "Could not find placement data in VM placement file for VM $vmId" }
- continue
- }
- val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false
- if (machineInCluster) {
- logger.info { "Ignored VM $vmId" }
- continue
- }
- }
-
- submitted++
- delay(max(0, time - simulationContext.clock.millis()))
- domain.launch {
- chan.send(Unit)
- val server = scheduler.deploy(
- workload.image.name, workload.image,
- Flavor(workload.image.maxCores, workload.image.requiredMemory)
- )
- // Monitor server events
- server.events
- .onEach {
- if (it is ServerEvent.StateChanged) {
- reporter.reportVmStateChange(it.server)
- }
+sealed class Reporter(name: String) : OptionGroup(name) {
+ /**
+ * Create the [Sc20Reporter] for this option.
+ */
+ abstract fun createReporter(): Sc20Reporter
+}
- delay(1)
- finished.send(Unit)
- }
- .collect()
- }
- }
+class Parquet : Reporter("Options for reporting using Parquet") {
+ private val path by option(help = "path to where the output should be stored")
+ .file()
+ .defaultLazy { File("data/results-${System.currentTimeMillis()}.parquet") }
- while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) {
- finished.receive()
- }
- } finally {
- reader.close()
- }
+ override fun createReporter(): Sc20Reporter = Sc20ParquetReporter(path)
}
/**
* Main entry point of the experiment.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
-fun main(args: Array<String>) {
- val cli = ArgParser(args).parseInto(::ExperimentParameters)
- logger.info("trace-directory: ${cli.traceDirectory}")
- logger.info("environment-file: ${cli.environmentFile}")
- logger.info("performance-interference-file: ${cli.performanceInterferenceFile}")
- logger.info("selected-vms-file: ${cli.selectedVmsFile}")
- logger.info("seed: ${cli.seed}")
- logger.info("failures: ${cli.failures}")
- logger.info("allocation-policy: ${cli.allocationPolicy}")
-
- val start = System.currentTimeMillis()
- val reporter: Sc20Reporter = Sc20ParquetReporter(cli.outputFile)
-
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider("test")
- val root = system.newDomain("root")
-
- val chan = Channel<Unit>(Channel.CONFLATED)
-
- val performanceInterferenceModel = try {
- val performanceInterferenceStream = if (cli.performanceInterferenceFile != null) {
- File(cli.performanceInterferenceFile!!).inputStream().buffered()
- } else {
- object {}.javaClass.getResourceAsStream("/env/performance-interference.json")
- }
- Sc20PerformanceInterferenceReader(performanceInterferenceStream)
- .construct()
- } catch (e: Throwable) {
- reporter.close()
- throw e
- }
- val vmPlacements = if (cli.vmPlacementFile == null) {
- emptyMap()
- } else {
- Sc20VmPlacementReader(File(cli.vmPlacementFile!!).inputStream().buffered()).construct()
- }
- val environmentReader = Sc20ClusterEnvironmentReader(File(cli.environmentFile))
- val traceReader = try {
- createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed)
- } catch (e: Throwable) {
- reporter.close()
- throw e
- }
- val allocationPolicy = when (cli.allocationPolicy) {
- "mem" -> AvailableMemoryAllocationPolicy()
- "mem-inv" -> AvailableMemoryAllocationPolicy(true)
- "core-mem" -> AvailableCoreMemoryAllocationPolicy()
- "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
- "active-servers" -> NumberOfActiveServersAllocationPolicy()
- "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
- "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
- "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
- "random" -> RandomAllocationPolicy(Random(cli.seed))
- "replay" -> ReplayAllocationPolicy(vmPlacements)
- else -> throw IllegalArgumentException("Unknown allocation policy: ${cli.allocationPolicy}")
- }
-
- root.launch {
- val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy)
-
- val failureDomain = if (cli.failures) {
- logger.info("ENABLING failures")
- createFailureDomain(cli.seed, cli.failureInterval, bareMetalProvisioner, chan)
- } else {
- null
- }
-
- attachMonitor(scheduler, reporter)
- processTrace(traceReader, scheduler, chan, reporter, vmPlacements)
-
- logger.debug("SUBMIT=${scheduler.submittedVms}")
- logger.debug("FAIL=${scheduler.unscheduledVms}")
- logger.debug("QUEUED=${scheduler.queuedVms}")
- logger.debug("RUNNING=${scheduler.runningVms}")
- logger.debug("FINISHED=${scheduler.finishedVms}")
-
- failureDomain?.cancel()
- scheduler.terminate()
- logger.info("Simulation took ${System.currentTimeMillis() - start} milliseconds")
- }
-
- runBlocking {
- system.run()
- system.terminate()
- }
-
- // Explicitly close the monitor to flush its buffer
- reporter.close()
-}
+fun main(args: Array<String>) = ExperimentCommand().main(args)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt
index fc3e98ae..eaac912a 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt
@@ -12,14 +12,13 @@ import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.io.File
import java.util.concurrent.ArrayBlockingQueue
import kotlin.concurrent.thread
private val logger = KotlinLogging.logger {}
-class Sc20ParquetReporter(
- destination: String
-) : Sc20Reporter {
+class Sc20ParquetReporter(destination: File) : Sc20Reporter {
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
private val schema = SchemaBuilder
.record("slice")
@@ -43,7 +42,7 @@ class Sc20ParquetReporter(
.name("totalRunningVms").type().longType().noDefault()
.name("totalFinishedVms").type().longType().noDefault()
.endRecord()
- private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination))
+ private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination.absolutePath))
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(4 * 1024 * 1024) // For compression