summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-05-08 10:23:10 +0200
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-05-08 10:23:10 +0200
commitdb934d9cebe1d48e148e54aca507e2c44a9bc946 (patch)
treecffff3e75926084edd742e28d2cb4c84d8372604
parente2e7e1abaf70d7e49e2e4af04648796f01ba6492 (diff)
parentcbdbf818004040b60aa122dc6cb98ef635fa5ac1 (diff)
Merge branch 'feat/database-metrics' into '2.x'
Add database integration for experimental results Closes #57 See merge request opendc/opendc-simulator!65
-rw-r--r--opendc/opendc-compute/build.gradle.kts1
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt13
-rw-r--r--opendc/opendc-experiments-sc20/build.gradle.kts8
-rw-r--r--opendc/opendc-experiments-sc20/schema.sql22
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt235
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt448
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt)76
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt5
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt200
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt)17
-rw-r--r--opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml46
-rw-r--r--opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt8
12 files changed, 731 insertions, 348 deletions
diff --git a/opendc/opendc-compute/build.gradle.kts b/opendc/opendc-compute/build.gradle.kts
index 09c904f2..7d43b064 100644
--- a/opendc/opendc-compute/build.gradle.kts
+++ b/opendc/opendc-compute/build.gradle.kts
@@ -33,6 +33,7 @@ dependencies {
implementation(kotlin("stdlib"))
api(project(":odcsim:odcsim-api"))
api(project(":opendc:opendc-core"))
+ implementation("io.github.microutils:kotlin-logging:1.7.9")
testRuntimeOnly(project(":odcsim:odcsim-engine-omega"))
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 2185b372..3603ae69 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -24,10 +24,13 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
+import mu.KotlinLogging
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.math.max
+private val logger = KotlinLogging.logger {}
+
@OptIn(ExperimentalCoroutinesApi::class)
class SimpleVirtProvisioningService(
public override val allocationPolicy: AllocationPolicy,
@@ -141,7 +144,7 @@ class SimpleVirtProvisioningService(
unscheduledVms++
incomingImages -= imageInstance
- println("[${clock.millis()}] CANNOT SPAWN ${imageInstance.image}")
+ logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]")
continue
} else {
break
@@ -149,7 +152,7 @@ class SimpleVirtProvisioningService(
}
try {
- println("[${clock.millis()}] SPAWN ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}")
+ logger.info { "Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" }
incomingImages -= imageInstance
// Speculatively update the hypervisor view information to prevent other images in the queue from
@@ -174,7 +177,7 @@ class SimpleVirtProvisioningService(
when (event) {
is ServerEvent.StateChanged -> {
if (event.server.state == ServerState.SHUTOFF) {
- println("[${clock.millis()}] FINISH ${event.server.uid} ${event.server.name} ${event.server.flavor}")
+ logger.info { "Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
runningVms--
finishedVms++
@@ -191,13 +194,13 @@ class SimpleVirtProvisioningService(
}
.launchIn(this)
} catch (e: InsufficientMemoryOnServerException) {
- println("Unable to deploy image due to insufficient memory")
+ logger.error("Failed to deploy VM", e)
selectedHv.numberOfActiveServers--
selectedHv.provisionedCores -= imageInstance.flavor.cpuCount
selectedHv.availableMemory += requiredMemory
} catch (e: Throwable) {
- e.printStackTrace()
+ logger.error("Failed to deploy VM", e)
}
}
}
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts
index ccfa3038..6b6366a7 100644
--- a/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -39,13 +39,15 @@ dependencies {
api(project(":opendc:opendc-core"))
implementation(project(":opendc:opendc-format"))
implementation(kotlin("stdlib"))
- implementation("com.xenomachina:kotlin-argparser:2.0.7")
- api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8")
+ implementation("com.github.ajalt:clikt:2.6.0")
+ implementation("io.github.microutils:kotlin-logging:1.7.9")
implementation("org.apache.parquet:parquet-avro:1.11.0")
implementation("org.apache.hadoop:hadoop-client:3.2.1") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ exclude(group = "log4j")
}
- runtimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}")
+ runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1")
+ runtimeOnly("org.postgresql:postgresql:42.2.12")
runtimeOnly(project(":odcsim:odcsim-engine-omega"))
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
diff --git a/opendc/opendc-experiments-sc20/schema.sql b/opendc/opendc-experiments-sc20/schema.sql
new file mode 100644
index 00000000..51990a75
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/schema.sql
@@ -0,0 +1,22 @@
+DROP TABLE IF EXISTS host_reports;
+CREATE TABLE host_reports (
+ id BIGSERIAL PRIMARY KEY NOT NULL,
+ experiment_id BIGINT NOT NULL,
+ time BIGINT NOT NULL,
+ duration BIGINT NOT NULL,
+ requested_burst BIGINT NOT NULL,
+ granted_burst BIGINT NOT NULL,
+ overcommissioned_burst BIGINT NOT NULL,
+ interfered_burst BIGINT NOT NULL,
+ cpu_usage DOUBLE PRECISION NOT NULL,
+ cpu_demand DOUBLE PRECISION NOT NULL,
+ image_count INTEGER NOT NULL,
+ server TEXT NOT NULL,
+ host_state TEXT NOT NULL,
+ host_usage DOUBLE PRECISION NOT NULL,
+ power_draw DOUBLE PRECISION NOT NULL,
+ total_submitted_vms BIGINT NOT NULL,
+ total_queued_vms BIGINT NOT NULL,
+ total_running_vms BIGINT NOT NULL,
+ total_finished_vms BIGINT NOT NULL
+);
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
new file mode 100644
index 00000000..e37dea8b
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
@@ -0,0 +1,235 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
+import com.atlarge.opendc.compute.core.Flavor
+import com.atlarge.opendc.compute.core.ServerEvent
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.compute.metal.NODE_CLUSTER
+import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.compute.virt.HypervisorEvent
+import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
+import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
+import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
+import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
+import com.atlarge.opendc.core.failure.FailureDomain
+import com.atlarge.opendc.core.failure.FaultInjector
+import com.atlarge.opendc.format.environment.EnvironmentReader
+import com.atlarge.opendc.format.trace.TraceReader
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
+import mu.KotlinLogging
+import java.io.File
+import java.util.TreeSet
+import kotlin.math.ln
+import kotlin.math.max
+import kotlin.random.Random
+
+/**
+ * The logger for this experiment.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * Construct the failure domain for the experiments.
+ */
+suspend fun createFailureDomain(
+ seed: Int,
+ failureInterval: Int,
+ bareMetalProvisioner: ProvisioningService,
+ chan: Channel<Unit>
+): Domain {
+ val root = simulationContext.domain
+ val domain = root.newDomain(name = "failures")
+ domain.launch {
+ chan.receive()
+ val random = Random(seed)
+ val injectors = mutableMapOf<String, FaultInjector>()
+ for (node in bareMetalProvisioner.nodes()) {
+ val cluster = node.metadata[NODE_CLUSTER] as String
+ val injector =
+ injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) }
+ injector.enqueue(node.metadata["driver"] as FailureDomain)
+ }
+ }
+ return domain
+}
+
+/**
+ * Obtain the [FaultInjector] to use for the experiments.
+ */
+fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): FaultInjector {
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return CorrelatedFaultInjector(
+ domain,
+ iatScale = ln(failureInterval.toDouble()), iatShape = 1.03, // Hours
+ sizeScale = 1.88, sizeShape = 1.25,
+ dScale = 9.51, dShape = 3.21, // Minutes
+ random = random
+ )
+}
+
+/**
+ * Create the trace reader from which the VM workloads are read.
+ */
+fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader {
+ return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed))
+}
+
+/**
+ * Construct the environment for a VM provisioner and return the provisioner instance.
+ */
+suspend fun createProvisioner(
+ root: Domain,
+ environmentReader: EnvironmentReader,
+ allocationPolicy: AllocationPolicy
+): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) {
+ val environment = environmentReader.use { it.construct(root) }
+ val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
+
+ // Wait for the bare metal nodes to be spawned
+ delay(10)
+
+ val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner)
+
+ // Wait for the hypervisors to be spawned
+ delay(10)
+
+ bareMetalProvisioner to scheduler
+}
+
+/**
+ * Attach the specified monitor to the VM provisioner.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) {
+ val domain = simulationContext.domain
+ val hypervisors = scheduler.drivers()
+
+ // Monitor hypervisor events
+ for (hypervisor in hypervisors) {
+ // TODO Do not expose VirtDriver directly but use Hypervisor class.
+ reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
+ hypervisor.server.events
+ .onEach { event ->
+ when (event) {
+ is ServerEvent.StateChanged -> {
+ reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
+ }
+ }
+ }
+ .launchIn(domain)
+ hypervisor.events
+ .onEach { event ->
+ when (event) {
+ is HypervisorEvent.SliceFinished -> reporter.reportHostSlice(
+ simulationContext.clock.millis(),
+ event.requestedBurst,
+ event.grantedBurst,
+ event.overcommissionedBurst,
+ event.interferedBurst,
+ event.cpuUsage,
+ event.cpuDemand,
+ event.numberOfDeployedImages,
+ event.hostServer,
+ scheduler.submittedVms,
+ scheduler.queuedVms,
+ scheduler.runningVms,
+ scheduler.finishedVms
+ )
+ }
+ }
+ .launchIn(domain)
+ }
+}
+
+/**
+ * Process the trace.
+ */
+suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, reporter: Sc20Reporter, vmPlacements: Map<String, String> = emptyMap()) {
+ val domain = simulationContext.domain
+
+ try {
+ var submitted = 0L
+ val finished = Channel<Unit>(Channel.CONFLATED)
+ val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name })
+
+ while (reader.hasNext()) {
+ val (time, workload) = reader.next()
+
+ if (vmPlacements.isNotEmpty()) {
+ val vmId = workload.name.replace("VM Workload ", "")
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null) {
+ logger.warn { "Could not find placement data in VM placement file for VM $vmId" }
+ continue
+ }
+ val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false
+ if (machineInCluster) {
+ logger.info { "Ignored VM $vmId" }
+ continue
+ }
+ }
+
+ submitted++
+ delay(max(0, time - simulationContext.clock.millis()))
+ domain.launch {
+ chan.send(Unit)
+ val server = scheduler.deploy(
+ workload.image.name, workload.image,
+ Flavor(workload.image.maxCores, workload.image.requiredMemory)
+ )
+ // Monitor server events
+ server.events
+ .onEach {
+ if (it is ServerEvent.StateChanged) {
+ reporter.reportVmStateChange(it.server)
+ }
+
+ delay(1)
+ finished.send(Unit)
+ }
+ .collect()
+ }
+ }
+
+ while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) {
+ finished.receive()
+ }
+ } finally {
+ reader.close()
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
index b1964197..51448c9e 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
@@ -24,82 +24,101 @@
package com.atlarge.opendc.experiments.sc20
-import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
-import com.atlarge.opendc.compute.core.Flavor
-import com.atlarge.opendc.compute.core.ServerEvent
-import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
-import com.atlarge.opendc.compute.core.workload.VmWorkload
-import com.atlarge.opendc.compute.metal.NODE_CLUSTER
-import com.atlarge.opendc.compute.metal.service.ProvisioningService
-import com.atlarge.opendc.compute.virt.HypervisorEvent
-import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
-import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
-import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy
-import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
-import com.atlarge.opendc.core.failure.FailureDomain
-import com.atlarge.opendc.core.failure.FaultInjector
-import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import com.atlarge.opendc.format.trace.TraceReader
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import com.xenomachina.argparser.ArgParser
-import com.xenomachina.argparser.default
-import kotlinx.coroutines.ExperimentalCoroutinesApi
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.groups.OptionGroup
+import com.github.ajalt.clikt.parameters.groups.default
+import com.github.ajalt.clikt.parameters.groups.groupChoice
+import com.github.ajalt.clikt.parameters.groups.mutuallyExclusiveOptions
+import com.github.ajalt.clikt.parameters.groups.required
+import com.github.ajalt.clikt.parameters.options.convert
+import com.github.ajalt.clikt.parameters.options.default
+import com.github.ajalt.clikt.parameters.options.defaultLazy
+import com.github.ajalt.clikt.parameters.options.flag
+import com.github.ajalt.clikt.parameters.options.option
+import com.github.ajalt.clikt.parameters.options.required
+import com.github.ajalt.clikt.parameters.types.choice
+import com.github.ajalt.clikt.parameters.types.file
+import com.github.ajalt.clikt.parameters.types.int
+import com.github.ajalt.clikt.parameters.types.long
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.collect
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
-import kotlinx.coroutines.withContext
+import mu.KotlinLogging
import java.io.File
import java.io.FileReader
+import java.io.InputStream
+import java.sql.DriverManager
import java.util.ServiceLoader
-import java.util.TreeSet
-import kotlin.math.ln
-import kotlin.math.max
import kotlin.random.Random
-class ExperimentParameters(parser: ArgParser) {
- val traceDirectory by parser.storing("path to the trace directory")
- val environmentFile by parser.storing("path to the environment file")
- val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null }
- val vmPlacementFile by parser.storing("path to the VM placement file").default { null }
- val outputFile by parser.storing("path to where the output should be stored")
- .default { "data/results-${System.currentTimeMillis()}.parquet" }
- val selectedVms by parser.storing("the VMs to run") { parseVMs(this) }
- .default { emptyList() }
- val selectedVmsFile by parser.storing("path to a file containing the VMs to run") {
- parseVMs(FileReader(File(this)).readText())
- }
- .default { emptyList() }
- val seed by parser.storing("the random seed") { toInt() }
- .default(0)
- val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures")
- val failureInterval by parser.storing("expected number of hours between failures") { toInt() }
- .default(24 * 7) // one week
- val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem")
+/**
+ * The logger for this experiment.
+ */
+private val logger = KotlinLogging.logger {}
- fun getSelectedVmList(): List<String> {
- return if (selectedVms.isEmpty()) {
- selectedVmsFile
- } else {
- selectedVms
+/**
+ * Represents the command for running the experiment.
+ */
+class ExperimentCommand : CliktCommand(name = "sc20-experiment") {
+ private val environment by option("--environment-file", help = "path to the environment file")
+ .file()
+ .required()
+ private val performanceInterferenceStream by option("--performance-interference-file", help = "path to the performance interference file")
+ .file()
+ .convert { it.inputStream() as InputStream }
+ .defaultLazy { ExperimentCommand::class.java.getResourceAsStream("/env/performance-interference.json") }
+
+ private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file")
+ .file()
+ .convert {
+ Sc20VmPlacementReader(it.inputStream().buffered()).construct()
}
- }
+ .default(emptyMap())
+
+ private val selectedVms by mutuallyExclusiveOptions(
+ option("--selected-vms", help = "the VMs to run").convert { parseVMs(it) },
+ option("--selected-vms-file").file().convert { parseVMs(FileReader(it).readText()) }
+ ).default(emptyList())
+
+ private val seed by option(help = "the random seed")
+ .int()
+ .default(0)
+ private val failures by option("-x", "--failures", help = "enable (correlated) machine failures")
+ .flag()
+ private val failureInterval by option(help = "expected number of hours between failures")
+ .int()
+ .default(24 * 7) // one week
+ private val allocationPolicy by option(help = "name of VM allocation policy to use")
+ .choice(
+ "mem", "mem-inv",
+ "core-mem", "core-mem-inv",
+ "active-servers", "active-servers-inv",
+ "provisioned-cores", "provisioned-cores-inv",
+ "random", "replay"
+ )
+ .default("core-mem")
+
+ private val trace by option("--trace-directory", help = "path to the trace directory")
+ .file(canBeFile = false)
+ .required()
+
+ private val reporter by option().groupChoice(
+ "parquet" to Parquet(),
+ "postgres" to Postgres()
+ ).required()
private fun parseVMs(string: String): List<String> {
// Handle case where VM list contains a VM name with an (escaped) single-quote in it
@@ -109,267 +128,108 @@ class ExperimentParameters(parser: ArgParser) {
val vms: List<String> = jacksonObjectMapper().readValue(sanitizedString)
return vms
}
-}
-/**
- * Construct the failure domain for the experiments.
- */
-suspend fun createFailureDomain(
- seed: Int,
- failureInterval: Int,
- bareMetalProvisioner: ProvisioningService,
- chan: Channel<Unit>
-): Domain {
- val root = simulationContext.domain
- val domain = root.newDomain(name = "failures")
- domain.launch {
- chan.receive()
- val random = Random(seed)
- val injectors = mutableMapOf<String, FaultInjector>()
- for (node in bareMetalProvisioner.nodes()) {
- val cluster = node.metadata[NODE_CLUSTER] as String
- val injector =
- injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) }
- injector.enqueue(node.metadata["driver"] as FailureDomain)
+ override fun run() {
+ logger.info("seed: $seed")
+ logger.info("failures: $failures")
+ logger.info("allocation-policy: $allocationPolicy")
+
+ val start = System.currentTimeMillis()
+ val reporter: Sc20Reporter = reporter.createReporter()
+
+ val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
+ val system = provider("test")
+ val root = system.newDomain("root")
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = when (this.allocationPolicy) {
+ "mem" -> AvailableMemoryAllocationPolicy()
+ "mem-inv" -> AvailableMemoryAllocationPolicy(true)
+ "core-mem" -> AvailableCoreMemoryAllocationPolicy()
+ "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
+ "active-servers" -> NumberOfActiveServersAllocationPolicy()
+ "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
+ "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
+ "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
+ "random" -> RandomAllocationPolicy(Random(seed))
+ "replay" -> ReplayAllocationPolicy(vmPlacements)
+ else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}")
}
- }
- return domain
-}
-/**
- * Obtain the [FaultInjector] to use for the experiments.
- */
-fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): FaultInjector {
- // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- // GRID'5000
- return CorrelatedFaultInjector(
- domain,
- iatScale = ln(failureInterval.toDouble()), iatShape = 1.03, // Hours
- sizeScale = 1.88, sizeShape = 1.25,
- dScale = 9.51, dShape = 3.21, // Minutes
- random = random
- )
-}
-
-/**
- * Create the trace reader from which the VM workloads are read.
- */
-fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader {
- return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed))
-}
+ val performanceInterferenceModel = try {
+ Sc20PerformanceInterferenceReader(performanceInterferenceStream).construct()
+ } catch (e: Throwable) {
+ reporter.close()
+ throw e
+ }
+ val environmentReader = Sc20ClusterEnvironmentReader(environment)
+ val traceReader = try {
+ createTraceReader(trace, performanceInterferenceModel, selectedVms, seed)
+ } catch (e: Throwable) {
+ reporter.close()
+ throw e
+ }
-/**
- * Construct the environment for a VM provisioner and return the provisioner instance.
- */
-suspend fun createProvisioner(
- root: Domain,
- environmentReader: EnvironmentReader,
- allocationPolicy: AllocationPolicy
-): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) {
- val environment = environmentReader.use { it.construct(root) }
- val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key]
+ root.launch {
+ val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy)
- // Wait for the bare metal nodes to be spawned
- delay(10)
+ val failureDomain = if (failures) {
+ logger.info("ENABLING failures")
+ createFailureDomain(seed, failureInterval, bareMetalProvisioner, chan)
+ } else {
+ null
+ }
- val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner)
+ attachMonitor(scheduler, reporter)
+ processTrace(traceReader, scheduler, chan, reporter, vmPlacements)
- // Wait for the hypervisors to be spawned
- delay(10)
+ logger.debug("SUBMIT=${scheduler.submittedVms}")
+ logger.debug("FAIL=${scheduler.unscheduledVms}")
+ logger.debug("QUEUED=${scheduler.queuedVms}")
+ logger.debug("RUNNING=${scheduler.runningVms}")
+ logger.debug("FINISHED=${scheduler.finishedVms}")
- bareMetalProvisioner to scheduler
-}
+ failureDomain?.cancel()
+ scheduler.terminate()
+ logger.info("Simulation took ${System.currentTimeMillis() - start} milliseconds")
+ }
-/**
- * Attach the specified monitor to the VM provisioner.
- */
-@OptIn(ExperimentalCoroutinesApi::class)
-suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc20Monitor) {
- val domain = simulationContext.domain
- val hypervisors = scheduler.drivers()
+ runBlocking {
+ system.run()
+ system.terminate()
+ }
- // Monitor hypervisor events
- for (hypervisor in hypervisors) {
- // TODO Do not expose VirtDriver directly but use Hypervisor class.
- monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
- hypervisor.server.events
- .onEach { event ->
- when (event) {
- is ServerEvent.StateChanged -> {
- monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
- }
- }
- }
- .launchIn(domain)
- hypervisor.events
- .onEach { event ->
- when (event) {
- is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(
- simulationContext.clock.millis(),
- event.requestedBurst,
- event.grantedBurst,
- event.overcommissionedBurst,
- event.interferedBurst,
- event.cpuUsage,
- event.cpuDemand,
- event.numberOfDeployedImages,
- event.hostServer,
- scheduler.submittedVms,
- scheduler.queuedVms,
- scheduler.runningVms,
- scheduler.finishedVms
- )
- }
- }
- .launchIn(domain)
+ // Explicitly close the monitor to flush its buffer
+ reporter.close()
}
}
-/**
- * Process the trace.
- */
-suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: Sc20Monitor, vmPlacements: Map<String, String> = emptyMap()) {
- val domain = simulationContext.domain
-
- try {
- var submitted = 0L
- val finished = Channel<Unit>(Channel.CONFLATED)
- val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name })
-
- while (reader.hasNext()) {
- val (time, workload) = reader.next()
+sealed class Reporter(name: String) : OptionGroup(name) {
+ /**
+ * Create the [Sc20Reporter] for this option.
+ */
+ abstract fun createReporter(): Sc20Reporter
+}
- if (vmPlacements.isNotEmpty()) {
- val vmId = workload.name.replace("VM Workload ", "")
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null) {
- println("Could not find placement data in VM placement file for VM $vmId")
- continue
- }
- val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false
- if (machineInCluster) {
- println("Ignored VM")
- continue
- }
- }
+class Parquet : Reporter("Options for reporting using Parquet") {
+ private val path by option(help = "path to where the output should be stored")
+ .file()
+ .defaultLazy { File("data/results-${System.currentTimeMillis()}.parquet") }
- submitted++
- delay(max(0, time - simulationContext.clock.millis()))
- domain.launch {
- chan.send(Unit)
- val server = scheduler.deploy(
- workload.image.name, workload.image,
- Flavor(workload.image.maxCores, workload.image.requiredMemory)
- )
- // Monitor server events
- server.events
- .onEach {
- if (it is ServerEvent.StateChanged) {
- monitor.onVmStateChanged(it.server)
- }
+ override fun createReporter(): Sc20Reporter = Sc20ParquetReporter(path)
+}
- delay(1)
- finished.send(Unit)
- }
- .collect()
- }
- }
+class Postgres : Reporter("Options for reporting using PostgreSQL") {
+ private val url by option(help = "JDBC connection url").required()
+ private val experimentId by option(help = "Experiment ID").long().required()
- while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) {
- finished.receive()
- }
- } finally {
- reader.close()
+ override fun createReporter(): Sc20Reporter {
+ val conn = DriverManager.getConnection(url)
+ return Sc20PostgresReporter(conn, experimentId)
}
}
/**
* Main entry point of the experiment.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
-fun main(args: Array<String>) {
- val cli = ArgParser(args).parseInto(::ExperimentParameters)
- println("trace-directory: ${cli.traceDirectory}")
- println("environment-file: ${cli.environmentFile}")
- println("performance-interference-file: ${cli.performanceInterferenceFile}")
- println("selected-vms-file: ${cli.selectedVmsFile}")
- println("seed: ${cli.seed}")
- println("failures: ${cli.failures}")
- println("allocation-policy: ${cli.allocationPolicy}")
-
- val start = System.currentTimeMillis()
- val monitor: Sc20Monitor = Sc20ParquetMonitor(cli.outputFile)
-
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider("test")
- val root = system.newDomain("root")
-
- val chan = Channel<Unit>(Channel.CONFLATED)
-
- val performanceInterferenceModel = try {
- val performanceInterferenceStream = if (cli.performanceInterferenceFile != null) {
- File(cli.performanceInterferenceFile!!).inputStream().buffered()
- } else {
- object {}.javaClass.getResourceAsStream("/env/performance-interference.json")
- }
- Sc20PerformanceInterferenceReader(performanceInterferenceStream)
- .construct()
- } catch (e: Throwable) {
- monitor.close()
- throw e
- }
- val vmPlacements = if (cli.vmPlacementFile == null) {
- emptyMap()
- } else {
- Sc20VmPlacementReader(File(cli.vmPlacementFile!!).inputStream().buffered()).construct()
- }
- val environmentReader = Sc20ClusterEnvironmentReader(File(cli.environmentFile))
- val traceReader = try {
- createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed)
- } catch (e: Throwable) {
- monitor.close()
- throw e
- }
- val allocationPolicy = when (cli.allocationPolicy) {
- "mem" -> AvailableMemoryAllocationPolicy()
- "mem-inv" -> AvailableMemoryAllocationPolicy(true)
- "core-mem" -> AvailableCoreMemoryAllocationPolicy()
- "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
- "active-servers" -> NumberOfActiveServersAllocationPolicy()
- "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
- "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
- "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
- "random" -> RandomAllocationPolicy(Random(cli.seed))
- "replay" -> ReplayAllocationPolicy(vmPlacements)
- else -> throw IllegalArgumentException("Unknown allocation policy: ${cli.allocationPolicy}")
- }
-
- root.launch {
- val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy)
-
- val failureDomain = if (cli.failures) {
- println("ENABLING failures")
- createFailureDomain(cli.seed, cli.failureInterval, bareMetalProvisioner, chan)
- } else {
- null
- }
-
- attachMonitor(scheduler, monitor)
- processTrace(traceReader, scheduler, chan, monitor, vmPlacements)
-
- println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
-
- failureDomain?.cancel()
- scheduler.terminate()
- println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds")
- }
-
- runBlocking {
- system.run()
- system.terminate()
- }
-
- // Explicitly close the monitor to flush its buffer
- monitor.close()
-}
+fun main(args: Array<String>) = ExperimentCommand().main(args)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt
index 5e554196..f2139144 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt
@@ -6,17 +6,19 @@ import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import kotlinx.coroutines.flow.first
+import mu.KotlinLogging
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.io.File
import java.util.concurrent.ArrayBlockingQueue
import kotlin.concurrent.thread
-class Sc20ParquetMonitor(
- destination: String
-) : Sc20Monitor {
+private val logger = KotlinLogging.logger {}
+
+class Sc20ParquetReporter(destination: File) : Sc20Reporter {
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
private val schema = SchemaBuilder
.record("slice")
@@ -24,23 +26,23 @@ class Sc20ParquetMonitor(
.fields()
.name("time").type().longType().noDefault()
.name("duration").type().longType().noDefault()
- .name("requestedBurst").type().longType().noDefault()
- .name("grantedBurst").type().longType().noDefault()
- .name("overcommissionedBurst").type().longType().noDefault()
- .name("interferedBurst").type().longType().noDefault()
- .name("cpuUsage").type().doubleType().noDefault()
- .name("cpuDemand").type().doubleType().noDefault()
- .name("numberOfDeployedImages").type().intType().noDefault()
+ .name("requested_burst").type().longType().noDefault()
+ .name("granted_burst").type().longType().noDefault()
+ .name("overcommissioned_burst").type().longType().noDefault()
+ .name("interfered_burst").type().longType().noDefault()
+ .name("cpu_usage").type().doubleType().noDefault()
+ .name("cpu_demand").type().doubleType().noDefault()
+ .name("image_count").type().intType().noDefault()
.name("server").type().stringType().noDefault()
- .name("hostState").type().stringType().noDefault()
- .name("hostUsage").type().doubleType().noDefault()
- .name("powerDraw").type().doubleType().noDefault()
- .name("totalSubmittedVms").type().longType().noDefault()
- .name("totalQueuedVms").type().longType().noDefault()
- .name("totalRunningVms").type().longType().noDefault()
- .name("totalFinishedVms").type().longType().noDefault()
+ .name("host_state").type().stringType().noDefault()
+ .name("host_usage").type().doubleType().noDefault()
+ .name("power_draw").type().doubleType().noDefault()
+ .name("total_submitted_vms").type().longType().noDefault()
+ .name("total_queued_vms").type().longType().noDefault()
+ .name("total_running_vms").type().longType().noDefault()
+ .name("total_finished_vms").type().longType().noDefault()
.endRecord()
- private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination))
+ private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination.absolutePath))
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(4 * 1024 * 1024) // For compression
@@ -60,9 +62,9 @@ class Sc20ParquetMonitor(
}
}
- override suspend fun onVmStateChanged(server: Server) {}
+ override suspend fun reportVmStateChange(server: Server) {}
- override suspend fun serverStateChanged(
+ override suspend fun reportHostStateChange(
driver: VirtDriver,
server: Server,
submittedVms: Long,
@@ -73,7 +75,7 @@ class Sc20ParquetMonitor(
val lastServerState = lastServerStates[server]
if (server.state == ServerState.SHUTOFF && lastServerState != null) {
val duration = simulationContext.clock.millis() - lastServerState.second
- onSliceFinish(
+ reportHostSlice(
simulationContext.clock.millis(),
0,
0,
@@ -91,12 +93,12 @@ class Sc20ParquetMonitor(
)
}
- println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}")
+ logger.info("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]")
lastServerStates[server] = Pair(server.state, simulationContext.clock.millis())
}
- override suspend fun onSliceFinish(
+ override suspend fun reportHostSlice(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
@@ -120,21 +122,21 @@ class Sc20ParquetMonitor(
val record = GenericData.Record(schema)
record.put("time", time)
record.put("duration", duration)
- record.put("requestedBurst", requestedBurst)
- record.put("grantedBurst", grantedBurst)
- record.put("overcommissionedBurst", overcommissionedBurst)
- record.put("interferedBurst", interferedBurst)
- record.put("cpuUsage", cpuUsage)
- record.put("cpuDemand", cpuDemand)
- record.put("numberOfDeployedImages", numberOfDeployedImages)
+ record.put("requested_burst", requestedBurst)
+ record.put("granted_burst", grantedBurst)
+ record.put("overcommissioned_burst", overcommissionedBurst)
+ record.put("interfered_burst", interferedBurst)
+ record.put("cpu_usage", cpuUsage)
+ record.put("cpu_demand", cpuDemand)
+ record.put("image_count", numberOfDeployedImages)
record.put("server", hostServer.uid)
- record.put("hostState", hostServer.state)
- record.put("hostUsage", usage)
- record.put("powerDraw", powerDraw)
- record.put("totalSubmittedVms", submittedVms)
- record.put("totalQueuedVms", queuedVms)
- record.put("totalRunningVms", runningVms)
- record.put("totalFinishedVms", finishedVms)
+ record.put("host_state", hostServer.state)
+ record.put("host_usage", usage)
+ record.put("power_draw", powerDraw)
+ record.put("total_submitted_vms", submittedVms)
+ record.put("total_queued_vms", queuedVms)
+ record.put("total_running_vms", runningVms)
+ record.put("total_finished_vms", finishedVms)
queue.put(record)
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
index 0a7718e9..8ae1693c 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
@@ -32,6 +32,7 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.core.User
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
+import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
@@ -49,6 +50,8 @@ import java.util.concurrent.ArrayBlockingQueue
import kotlin.concurrent.thread
import kotlin.random.Random
+private val logger = KotlinLogging.logger {}
+
/**
* A [TraceReader] for the internal VM workload trace format.
*
@@ -190,7 +193,7 @@ class Sc20ParquetTraceReader(
assert(uid !in takenIds)
takenIds += uid
- println(id)
+ logger.info("Processing VM $id")
val internalBuffer = mutableListOf<FlopsHistoryFragment>()
val externalBuffer = mutableListOf<FlopsHistoryFragment>()
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt
new file mode 100644
index 00000000..5c5e6ceb
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt
@@ -0,0 +1,200 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.odcsim.simulationContext
+import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerState
+import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import kotlinx.coroutines.flow.first
+import mu.KotlinLogging
+import java.sql.Connection
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.atomic.AtomicBoolean
+import kotlin.concurrent.thread
+
+private val logger = KotlinLogging.logger {}
+
+class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20Reporter {
+ private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
+ private val queue = ArrayBlockingQueue<Report>(2048)
+ private val stop = AtomicBoolean(false)
+ private val writerThread = thread(start = true, name = "sc20-writer") {
+ val stmt = try {
+ conn.prepareStatement(
+ """
+ INSERT INTO host_reports (experiment_id, time, duration, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, image_count, server, host_state, host_usage, power_draw, total_submitted_vms, total_queued_vms, total_running_vms, total_finished_vms)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """.trimIndent()
+ )
+ } catch (e: Throwable) {
+ conn.close()
+ throw e
+ }
+
+ val batchSize = 4096
+ var batch = 0
+
+ try {
+ while (!stop.get()) {
+ val record = queue.take()
+ stmt.setLong(1, experimentId)
+ stmt.setLong(2, record.time)
+ stmt.setLong(3, record.duration)
+ stmt.setLong(4, record.requestedBurst)
+ stmt.setLong(5, record.grantedBurst)
+ stmt.setLong(6, record.overcommissionedBurst)
+ stmt.setLong(7, record.interferedBurst)
+ stmt.setDouble(8, record.cpuUsage)
+ stmt.setDouble(9, record.cpuDemand)
+ stmt.setInt(10, record.numberOfDeployedImages)
+ stmt.setString(11, record.hostServer.uid.toString())
+ stmt.setString(12, record.hostServer.state.name)
+ stmt.setDouble(13, record.hostUsage)
+ stmt.setDouble(14, record.powerDraw)
+ stmt.setLong(15, record.submittedVms)
+ stmt.setLong(16, record.queuedVms)
+ stmt.setLong(17, record.runningVms)
+ stmt.setLong(18, record.finishedVms)
+ stmt.addBatch()
+ batch++
+
+ if (batch > batchSize) {
+ stmt.executeBatch()
+ batch = 0
+ }
+ }
+ } finally {
+ stmt.executeBatch()
+ stmt.close()
+ conn.close()
+ }
+ }
+
+ override suspend fun reportVmStateChange(server: Server) {}
+
+ override suspend fun reportHostStateChange(
+ driver: VirtDriver,
+ server: Server,
+ submittedVms: Long,
+ queuedVms: Long,
+ runningVms: Long,
+ finishedVms: Long
+ ) {
+ val lastServerState = lastServerStates[server]
+ if (server.state == ServerState.SHUTOFF && lastServerState != null) {
+ val duration = simulationContext.clock.millis() - lastServerState.second
+ reportHostSlice(
+ simulationContext.clock.millis(),
+ 0,
+ 0,
+ 0,
+ 0,
+ 0.0,
+ 0.0,
+ 0,
+ server,
+ submittedVms,
+ queuedVms,
+ runningVms,
+ finishedVms,
+ duration
+ )
+ }
+
+ logger.info("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]")
+
+ lastServerStates[server] = Pair(server.state, simulationContext.clock.millis())
+ }
+
+ override suspend fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ numberOfDeployedImages: Int,
+ hostServer: Server,
+ submittedVms: Long,
+ queuedVms: Long,
+ runningVms: Long,
+ finishedVms: Long,
+ duration: Long
+ ) {
+ // Assume for now that the host is not virtualized and measure the current power draw
+ val driver = hostServer.services[BareMetalDriver.Key]
+ val usage = driver.usage.first()
+ val powerDraw = driver.powerDraw.first()
+
+ queue.put(
+ Report(
+ time,
+ duration,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ numberOfDeployedImages,
+ hostServer,
+ usage,
+ powerDraw,
+ submittedVms,
+ queuedVms,
+ runningVms,
+ finishedVms
+ )
+ )
+ }
+
+ override fun close() {
+ // Busy loop to wait for writer thread to finish
+ stop.set(true)
+ writerThread.join()
+ }
+
+ data class Report(
+ val time: Long,
+ val duration: Long,
+ val requestedBurst: Long,
+ val grantedBurst: Long,
+ val overcommissionedBurst: Long,
+ val interferedBurst: Long,
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val numberOfDeployedImages: Int,
+ val hostServer: Server,
+ val hostUsage: Double,
+ val powerDraw: Double,
+ val submittedVms: Long,
+ val queuedVms: Long,
+ val runningVms: Long,
+ val finishedVms: Long
+ )
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt
index 4b8b80a8..84500417 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt
@@ -28,10 +28,16 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import java.io.Closeable
-interface Sc20Monitor : Closeable {
- suspend fun onVmStateChanged(server: Server) {}
+interface Sc20Reporter : Closeable {
+ /**
+ * This method is invoked when the state of a VM changes.
+ */
+ suspend fun reportVmStateChange(server: Server) {}
- suspend fun serverStateChanged(
+ /**
+ * This method is invoked when the state of a host changes.
+ */
+ suspend fun reportHostStateChange(
driver: VirtDriver,
server: Server,
submittedVms: Long,
@@ -40,7 +46,10 @@ interface Sc20Monitor : Closeable {
finishedVms: Long
) {}
- suspend fun onSliceFinish(
+ /**
+ * This method is invoked for a host for each slice that is finishes.
+ */
+ suspend fun reportHostSlice(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
diff --git a/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml b/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml
new file mode 100644
index 00000000..77a15e55
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ MIT License
+ ~
+ ~ Copyright (c) 2020 atlarge-research
+ ~
+ ~ Permission is hereby granted, free of charge, to any person obtaining a copy
+ ~ of this software and associated documentation files (the "Software"), to deal
+ ~ in the Software without restriction, including without limitation the rights
+ ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ ~ copies of the Software, and to permit persons to whom the Software is
+ ~ furnished to do so, subject to the following conditions:
+ ~
+ ~ The above copyright notice and this permission notice shall be included in all
+ ~ copies or substantial portions of the Software.
+ ~
+ ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ ~ SOFTWARE.
+ -->
+
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false" />
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="com.atlarge.odcsim" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="com.atlarge.opendc" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.apache.hadoop" level="warn" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Root level="error">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 2bf6bcf3..239d018a 100644
--- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -63,7 +63,7 @@ class Sc20IntegrationTest {
/**
* The monitor used to keep track of the metrics.
*/
- private lateinit var monitor: TestSc20Monitor
+ private lateinit var monitor: TestSc20Reporter
/**
* Setup the experimental environment.
@@ -73,7 +73,7 @@ class Sc20IntegrationTest {
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
simulationEngine = provider("test")
root = simulationEngine.newDomain("root")
- monitor = TestSc20Monitor()
+ monitor = TestSc20Reporter()
}
/**
@@ -151,13 +151,13 @@ class Sc20IntegrationTest {
return Sc20ClusterEnvironmentReader(stream)
}
- class TestSc20Monitor : Sc20Monitor {
+ class TestSc20Reporter : Sc20Reporter {
var totalRequestedBurst = 0L
var totalGrantedBurst = 0L
var totalOvercommissionedBurst = 0L
var totalInterferedBurst = 0L
- override suspend fun onSliceFinish(
+ override suspend fun reportHostSlice(
time: Long,
requestedBurst: Long,
grantedBurst: Long,