summaryrefslogtreecommitdiff
path: root/simulator/opendc/opendc-runner-web
diff options
context:
space:
mode:
authorGeorgios Andreadis <info@gandreadis.com>2020-07-20 12:44:04 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-08-24 19:48:12 +0200
commit53e60ccf0636e0076837d66a7dbea527e3b6e98d (patch)
tree041f83ae919d7ee9b5691a1666dbb61af26967d0 /simulator/opendc/opendc-runner-web
parentd8479e7e3744b8d1d31ac4d9f972e560eacd2cf8 (diff)
parent2a5f50e591f5e9c1da5db2f2011c779a88121675 (diff)
Merge pull request #9 from atlarge-research/feat/opendc-node
Add simulator integration
Diffstat (limited to 'simulator/opendc/opendc-runner-web')
-rw-r--r--simulator/opendc/opendc-runner-web/build.gradle.kts55
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt338
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt187
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt93
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt127
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml52
6 files changed, 852 insertions, 0 deletions
diff --git a/simulator/opendc/opendc-runner-web/build.gradle.kts b/simulator/opendc/opendc-runner-web/build.gradle.kts
new file mode 100644
index 00000000..6f725de1
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/build.gradle.kts
@@ -0,0 +1,55 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Experiment runner for OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-convention`
+ application
+}
+
+application {
+ mainClassName = "com.atlarge.opendc.runner.web.MainKt"
+}
+
+dependencies {
+ api(project(":opendc:opendc-core"))
+ implementation(project(":opendc:opendc-compute"))
+ implementation(project(":opendc:opendc-format"))
+ implementation(project(":opendc:opendc-experiments-sc20"))
+
+ implementation("com.github.ajalt:clikt:2.8.0")
+ implementation("io.github.microutils:kotlin-logging:1.7.10")
+
+ implementation("org.mongodb:mongodb-driver-sync:4.0.5")
+ implementation("org.apache.spark:spark-sql_2.12:3.0.0") {
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ exclude(group = "log4j")
+ }
+
+ runtimeOnly(project(":odcsim:odcsim-engine-omega"))
+ runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1")
+ runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:2.13.1")
+}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
new file mode 100644
index 00000000..0ff9b870
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
@@ -0,0 +1,338 @@
+package com.atlarge.opendc.runner.web
+
+import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.opendc.compute.virt.service.allocation.*
+import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor
+import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain
+import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor
+import com.atlarge.opendc.experiments.sc20.experiment.processTrace
+import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
+import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
+import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.options.*
+import com.github.ajalt.clikt.parameters.types.file
+import com.github.ajalt.clikt.parameters.types.int
+import com.mongodb.MongoClientSettings
+import com.mongodb.MongoCredential
+import com.mongodb.ServerAddress
+import com.mongodb.client.MongoClients
+import com.mongodb.client.MongoCollection
+import com.mongodb.client.MongoDatabase
+import com.mongodb.client.model.Filters
+import java.io.File
+import java.util.*
+import kotlin.random.Random
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import mu.KotlinLogging
+import org.bson.Document
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * The provider for the simulation engine to use.
+ */
+private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
+
+/**
+ * Represents the CLI command for starting the OpenDC web runner.
+ */
+class RunnerCli : CliktCommand(name = "runner") {
+ /**
+ * The name of the database to use.
+ */
+ private val mongoDb by option(
+ "--mongo-db",
+ help = "name of the database to use",
+ envvar = "OPENDC_DB"
+ )
+ .default("opendc")
+
+ /**
+ * The database host to connect to.
+ */
+ private val mongoHost by option(
+ "--mongo-host",
+ help = "database host to connect to",
+ envvar = "OPENDC_DB_HOST"
+ )
+ .default("localhost")
+
+ /**
+ * The database port to connect to.
+ */
+ private val mongoPort by option(
+ "--mongo-port",
+ help = "database port to connect to",
+ envvar = "OPENDC_DB_PORT"
+ )
+ .int()
+ .default(27017)
+
+ /**
+ * The database user to connect with.
+ */
+ private val mongoUser by option(
+ "--mongo-user",
+ help = "database user to connect with",
+ envvar = "OPENDC_DB_USER"
+ )
+ .default("opendc")
+
+ /**
+ * The database password to connect with.
+ */
+ private val mongoPassword by option(
+ "--mongo-password",
+ help = "database password to connect with",
+ envvar = "OPENDC_DB_PASSWORD"
+ )
+ .convert { it.toCharArray() }
+ .required()
+
+ /**
+ * The path to the traces directory.
+ */
+ private val tracePath by option(
+ "--traces",
+ help = "path to the directory containing the traces",
+ envvar = "OPENDC_TRACES"
+ )
+ .file(canBeFile = false)
+ .defaultLazy { File("traces/") }
+
+ /**
+ * The path to the output directory.
+ */
+ private val outputPath by option(
+ "--output",
+ help = "path to the results directory",
+ envvar = "OPENDC_OUTPUT"
+ )
+ .file(canBeFile = false)
+ .defaultLazy { File("results/") }
+
+ /**
+ * The Spark master to connect to.
+ */
+ private val spark by option(
+ "--spark",
+ help = "Spark master to connect to",
+ envvar = "OPENDC_SPARK"
+ )
+ .default("local[*]")
+
+ /**
+ * Connect to the user-specified database.
+ */
+ private fun createDatabase(): MongoDatabase {
+ val credential = MongoCredential.createScramSha1Credential(
+ mongoUser,
+ mongoDb,
+ mongoPassword
+ )
+
+ val settings = MongoClientSettings.builder()
+ .credential(credential)
+ .applyToClusterSettings { it.hosts(listOf(ServerAddress(mongoHost, mongoPort))) }
+ .build()
+ val client = MongoClients.create(settings)
+ return client.getDatabase(mongoDb)
+ }
+
+ /**
+ * Run a single scenario.
+ */
+ private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>) {
+ val id = scenario.getString("_id")
+
+ logger.info { "Constructing performance interference model" }
+
+ val traceDir = File(
+ tracePath,
+ scenario.getEmbedded(listOf("trace", "traceId"), String::class.java)
+ )
+ val traceReader = Sc20RawParquetTraceReader(traceDir)
+ val performanceInterferenceReader = let {
+ val path = File(traceDir, "performance-interference-model.json")
+ val operational = scenario.get("operational", Document::class.java)
+ val enabled = operational.getBoolean("performanceInterferenceEnabled")
+
+ if (!enabled || !path.exists()) {
+ return@let null
+ }
+
+ path.inputStream().use { Sc20PerformanceInterferenceReader(it) }
+ }
+
+ val targets = portfolio.get("targets", Document::class.java)
+
+ repeat(targets.getInteger("repeatsPerScenario")) {
+ logger.info { "Starting repeat $it" }
+ runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader)
+ }
+
+ logger.info { "Finished simulation for scenario $id" }
+ }
+
+ /**
+ * Run a single repeat.
+ */
+ private suspend fun runRepeat(
+ scenario: Document,
+ repeat: Int,
+ topologies: MongoCollection<Document>,
+ traceReader: Sc20RawParquetTraceReader,
+ performanceInterferenceReader: Sc20PerformanceInterferenceReader?
+ ) {
+ val id = scenario.getString("_id")
+ val seed = repeat
+ val traceDocument = scenario.get("trace", Document::class.java)
+ val workloadName = traceDocument.getString("traceId")
+ val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
+
+ val seeder = Random(seed)
+ val system = provider("experiment-$id")
+ val root = system.newDomain("root")
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+
+ val operational = scenario.get("operational", Document::class.java)
+ val allocationPolicy =
+ when (val policyName = operational.getString("schedulerName")) {
+ "mem" -> AvailableMemoryAllocationPolicy()
+ "mem-inv" -> AvailableMemoryAllocationPolicy(true)
+ "core-mem" -> AvailableCoreMemoryAllocationPolicy()
+ "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
+ "active-servers" -> NumberOfActiveServersAllocationPolicy()
+ "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
+ "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
+ "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
+ "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
+ else -> throw IllegalArgumentException("Unknown policy $policyName")
+ }
+
+ val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap()
+ val trace = Sc20ParquetTraceReader(
+ listOf(traceReader),
+ performanceInterferenceModel,
+ Workload(workloadName, workloadFraction),
+ seed
+ )
+ val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java)
+ val environment = TopologyParser(topologies, topologyId)
+ val monitor = ParquetExperimentMonitor(
+ outputPath,
+ "scenario_id=$id/run_id=$repeat",
+ 4096
+ )
+
+ root.launch {
+ val (bareMetalProvisioner, scheduler) = createProvisioner(
+ root,
+ environment,
+ allocationPolicy
+ )
+
+ val failureDomain = if (operational.getBoolean("failuresEnabled")) {
+ logger.debug("ENABLING failures")
+ createFailureDomain(
+ seeder.nextInt(),
+ operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7,
+ bareMetalProvisioner,
+ chan
+ )
+ } else {
+ null
+ }
+
+ attachMonitor(scheduler, monitor)
+ processTrace(
+ trace,
+ scheduler,
+ chan,
+ monitor,
+ emptyMap()
+ )
+
+ logger.debug("SUBMIT=${scheduler.submittedVms}")
+ logger.debug("FAIL=${scheduler.unscheduledVms}")
+ logger.debug("QUEUED=${scheduler.queuedVms}")
+ logger.debug("RUNNING=${scheduler.runningVms}")
+ logger.debug("FINISHED=${scheduler.finishedVms}")
+
+ failureDomain?.cancel()
+ scheduler.terminate()
+ }
+
+ try {
+ system.run()
+ } finally {
+ system.terminate()
+ monitor.close()
+ }
+ }
+
+ override fun run() = runBlocking(Dispatchers.Default) {
+ logger.info { "Starting OpenDC web runner" }
+ logger.info { "Connecting to MongoDB instance" }
+ val database = createDatabase()
+ val manager = ScenarioManager(database.getCollection("scenarios"))
+ val portfolios = database.getCollection("portfolios")
+ val topologies = database.getCollection("topologies")
+
+ logger.info { "Launching Spark" }
+ val resultProcessor = ResultProcessor(spark, outputPath)
+
+ logger.info { "Watching for queued scenarios" }
+
+ while (true) {
+ val scenario = manager.findNext()
+
+ if (scenario == null) {
+ delay(5000)
+ continue
+ }
+
+ val id = scenario.getString("_id")
+
+ logger.info { "Found queued scenario $id: attempting to claim" }
+
+ if (!manager.claim(id)) {
+ logger.info { "Failed to claim scenario" }
+ continue
+ }
+
+ coroutineScope {
+ // Launch heartbeat process
+ launch {
+ delay(60000)
+ manager.heartbeat(id)
+ }
+
+ try {
+ val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!!
+ runScenario(portfolio, scenario, topologies)
+
+ logger.info { "Starting result processing" }
+
+ val result = resultProcessor.process(id)
+ manager.finish(id, result)
+
+ logger.info { "Successfully finished scenario $id" }
+ } catch (e: Exception) {
+ logger.warn(e) { "Scenario failed to finish" }
+ manager.fail(id)
+ }
+ }
+ }
+ }
+}
+
+/**
+ * Main entry point of the runner.
+ */
+fun main(args: Array<String>) = RunnerCli().main(args)
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt
new file mode 100644
index 00000000..39092653
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt
@@ -0,0 +1,187 @@
+package com.atlarge.opendc.runner.web
+
+import java.io.File
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions.*
+
+/**
+ * A helper class for processing the experiment results using Apache Spark.
+ */
+class ResultProcessor(private val master: String, private val outputPath: File) {
+ /**
+ * Process the results of the scenario with the given [id].
+ */
+ fun process(id: String): Result {
+ val spark = SparkSession.builder()
+ .master(master)
+ .appName("opendc-simulator-$id")
+ .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver
+ .orCreate
+
+ try {
+ val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path)
+ val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path)
+ val res = aggregate(hostMetrics, provisionerMetrics).first()
+
+ return Result(
+ res.getList<Long>(1),
+ res.getList<Long>(2),
+ res.getList<Long>(3),
+ res.getList<Long>(4),
+ res.getList<Double>(5),
+ res.getList<Double>(6),
+ res.getList<Double>(7),
+ res.getList<Int>(8),
+ res.getList<Long>(9),
+ res.getList<Long>(10),
+ res.getList<Long>(11),
+ res.getList<Int>(12),
+ res.getList<Int>(13),
+ res.getList<Int>(14),
+ res.getList<Int>(15)
+ )
+ } finally {
+ spark.close()
+ }
+ }
+
+ data class Result(
+ val totalRequestedBurst: List<Long>,
+ val totalGrantedBurst: List<Long>,
+ val totalOvercommittedBurst: List<Long>,
+ val totalInterferedBurst: List<Long>,
+ val meanCpuUsage: List<Double>,
+ val meanCpuDemand: List<Double>,
+ val meanNumDeployedImages: List<Double>,
+ val maxNumDeployedImages: List<Int>,
+ val totalPowerDraw: List<Long>,
+ val totalFailureSlices: List<Long>,
+ val totalFailureVmSlices: List<Long>,
+ val totalVmsSubmitted: List<Int>,
+ val totalVmsQueued: List<Int>,
+ val totalVmsFinished: List<Int>,
+ val totalVmsFailed: List<Int>
+ )
+
+ /**
+ * Perform aggregation of the experiment results.
+ */
+ private fun aggregate(hostMetrics: Dataset<Row>, provisionerMetrics: Dataset<Row>): Dataset<Row> {
+ // Extrapolate the duration of the entries to span the entire trace
+ val hostMetricsExtra = hostMetrics
+ .withColumn("slice_counts", floor(col("duration") / lit(sliceLength)))
+ .withColumn("power_draw", col("power_draw") * col("slice_counts"))
+ .withColumn("state_int", states[col("state")])
+ .withColumn("state_opposite_int", oppositeStates[col("state")])
+ .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int"))
+ .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts"))
+ .withColumn("failure_slice_count", col("slice_counts") * col("state_int"))
+ .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count"))
+
+ // Process all data in a single run
+ val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id")
+
+ // Aggregate the summed total metrics
+ val systemMetrics = hostMetricsGrouped.agg(
+ sum("requested_burst").alias("total_requested_burst"),
+ sum("granted_burst").alias("total_granted_burst"),
+ sum("overcommissioned_burst").alias("total_overcommitted_burst"),
+ sum("interfered_burst").alias("total_interfered_burst"),
+ sum("power_draw").alias("total_power_draw"),
+ sum("failure_slice_count").alias("total_failure_slices"),
+ sum("failure_vm_slice_count").alias("total_failure_vm_slices")
+ )
+
+ // Aggregate metrics per host
+ val hvMetrics = hostMetrics
+ .groupBy("run_id", "host_id")
+ .agg(
+ sum("cpu_usage").alias("mean_cpu_usage"),
+ sum("cpu_demand").alias("mean_cpu_demand"),
+ avg("vm_count").alias("mean_num_deployed_images"),
+ count(lit(1)).alias("num_rows")
+ )
+ .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows"))
+ .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows"))
+ .groupBy("run_id")
+ .agg(
+ avg("mean_cpu_usage").alias("mean_cpu_usage"),
+ avg("mean_cpu_demand").alias("mean_cpu_demand"),
+ avg("mean_num_deployed_images").alias("mean_num_deployed_images"),
+ max("mean_num_deployed_images").alias("max_num_deployed_images")
+ )
+
+ // Group the provisioner metrics per run
+ val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id")
+
+ // Aggregate the provisioner metrics
+ val provisionerMetricsAggregated = provisionerMetricsGrouped.agg(
+ max("vm_total_count").alias("total_vms_submitted"),
+ max("vm_waiting_count").alias("total_vms_queued"),
+ max("vm_active_count").alias("total_vms_running"),
+ max("vm_inactive_count").alias("total_vms_finished"),
+ max("vm_failed_count").alias("total_vms_failed")
+ )
+
+ // Join the results into a single data frame
+ return systemMetrics
+ .join(hvMetrics, "run_id")
+ .join(provisionerMetricsAggregated, "run_id")
+ .select(
+ col("total_requested_burst"),
+ col("total_granted_burst"),
+ col("total_overcommitted_burst"),
+ col("total_interfered_burst"),
+ col("mean_cpu_usage"),
+ col("mean_cpu_demand"),
+ col("mean_num_deployed_images"),
+ col("max_num_deployed_images"),
+ col("total_power_draw"),
+ col("total_failure_slices"),
+ col("total_failure_vm_slices"),
+ col("total_vms_submitted"),
+ col("total_vms_queued"),
+ col("total_vms_finished"),
+ col("total_vms_failed")
+ )
+ .groupBy(lit(1))
+ .agg(
+ // TODO Check if order of values is correct
+ collect_list(col("total_requested_burst")).alias("total_requested_burst"),
+ collect_list(col("total_granted_burst")).alias("total_granted_burst"),
+ collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"),
+ collect_list(col("total_interfered_burst")).alias("total_interfered_burst"),
+ collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"),
+ collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"),
+ collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"),
+ collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"),
+ collect_list(col("total_power_draw")).alias("total_power_draw"),
+ collect_list(col("total_failure_slices")).alias("total_failure_slices"),
+ collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"),
+ collect_list(col("total_vms_submitted")).alias("total_vms_submitted"),
+ collect_list(col("total_vms_queued")).alias("total_vms_queued"),
+ collect_list(col("total_vms_finished")).alias("total_vms_finished"),
+ collect_list(col("total_vms_failed")).alias("total_vms_failed")
+ )
+ }
+
+ // Spark helper functions
+ operator fun Column.times(other: Column): Column = `$times`(other)
+ operator fun Column.div(other: Column): Column = `$div`(other)
+ operator fun Column.get(other: Column): Column = this.apply(other)
+
+ val sliceLength = 5 * 60 * 1000
+ val states = map(
+ lit("ERROR"), lit(1),
+ lit("ACTIVE"), lit(0),
+ lit("SHUTOFF"), lit(0)
+ )
+ val oppositeStates = map(
+ lit("ERROR"), lit(0),
+ lit("ACTIVE"), lit(1),
+ lit("SHUTOFF"), lit(1)
+ )
+}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
new file mode 100644
index 00000000..40ffd282
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
@@ -0,0 +1,93 @@
+package com.atlarge.opendc.runner.web
+
+import com.mongodb.client.MongoCollection
+import com.mongodb.client.model.Filters
+import com.mongodb.client.model.Updates
+import java.time.Instant
+import org.bson.Document
+
+/**
+ * Manages the queue of scenarios that need to be processed.
+ */
+class ScenarioManager(private val collection: MongoCollection<Document>) {
+ /**
+ * Find the next scenario that the simulator needs to process.
+ */
+ fun findNext(): Document? {
+ return collection
+ .find(Filters.eq("simulation.state", "QUEUED"))
+ .first()
+ }
+
+ /**
+ * Claim the scenario in the database with the specified id.
+ */
+ fun claim(id: String): Boolean {
+ val res = collection.findOneAndUpdate(
+ Filters.and(
+ Filters.eq("_id", id),
+ Filters.eq("simulation.state", "QUEUED")
+ ),
+ Updates.combine(
+ Updates.set("simulation.state", "RUNNING"),
+ Updates.set("simulation.heartbeat", Instant.now())
+ )
+ )
+ return res != null
+ }
+
+ /**
+ * Update the heartbeat of the specified scenario.
+ */
+ fun heartbeat(id: String) {
+ collection.findOneAndUpdate(
+ Filters.and(
+ Filters.eq("_id", id),
+ Filters.eq("simulation.state", "RUNNING")
+ ),
+ Updates.set("simulation.heartbeat", Instant.now())
+ )
+ }
+
+ /**
+ * Mark the scenario as failed.
+ */
+ fun fail(id: String) {
+ collection.findOneAndUpdate(
+ Filters.eq("_id", id),
+ Updates.combine(
+ Updates.set("simulation.state", "FAILED"),
+ Updates.set("simulation.heartbeat", Instant.now())
+ )
+ )
+ }
+
+ /**
+ * Persist the specified results.
+ */
+ fun finish(id: String, result: ResultProcessor.Result) {
+ collection.findOneAndUpdate(
+ Filters.eq("_id", id),
+ Updates.combine(
+ Updates.set("simulation.state", "FINISHED"),
+ Updates.unset("simulation.time"),
+ Updates.set("results.total_requested_burst", result.totalRequestedBurst),
+ Updates.set("results.total_granted_burst", result.totalGrantedBurst),
+ Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst),
+ Updates.set("results.total_interfered_burst", result.totalInterferedBurst),
+ Updates.set("results.mean_cpu_usage", result.meanCpuUsage),
+ Updates.set("results.mean_cpu_demand", result.meanCpuDemand),
+ Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages),
+ Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages),
+ Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages),
+ Updates.set("results.total_power_draw", result.totalPowerDraw),
+ Updates.set("results.total_failure_slices", result.totalFailureSlices),
+ Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices),
+ Updates.set("results.total_vms_submitted", result.totalVmsSubmitted),
+ Updates.set("results.total_vms_queued", result.totalVmsQueued),
+ Updates.set("results.total_vms_finished", result.totalVmsFinished),
+ Updates.set("results.total_vms_failed", result.totalVmsFailed)
+ )
+ )
+ }
+}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
new file mode 100644
index 00000000..499585ec
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
@@ -0,0 +1,127 @@
+package com.atlarge.opendc.runner.web
+
+import com.atlarge.odcsim.Domain
+import com.atlarge.opendc.compute.core.MemoryUnit
+import com.atlarge.opendc.compute.core.ProcessingNode
+import com.atlarge.opendc.compute.core.ProcessingUnit
+import com.atlarge.opendc.compute.metal.NODE_CLUSTER
+import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
+import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel
+import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService
+import com.atlarge.opendc.core.Environment
+import com.atlarge.opendc.core.Platform
+import com.atlarge.opendc.core.Zone
+import com.atlarge.opendc.core.services.ServiceRegistry
+import com.atlarge.opendc.format.environment.EnvironmentReader
+import com.mongodb.client.AggregateIterable
+import com.mongodb.client.MongoCollection
+import com.mongodb.client.model.Aggregates
+import com.mongodb.client.model.Field
+import com.mongodb.client.model.Filters
+import com.mongodb.client.model.Projections
+import java.util.*
+import kotlinx.coroutines.launch
+import org.bson.Document
+
+/**
+ * A helper class that converts the MongoDB topology into an OpenDC environment.
+ */
+class TopologyParser(private val collection: MongoCollection<Document>, private val id: String) : EnvironmentReader {
+ /**
+ * Parse the topology with the specified [id].
+ */
+ override suspend fun construct(dom: Domain): Environment {
+ val nodes = mutableListOf<SimpleBareMetalDriver>()
+ val random = Random(0)
+
+ for (machine in fetchMachines(id)) {
+ val machineId = machine.getString("_id")
+ val clusterId = machine.getString("rack_id")
+ val position = machine.getInteger("position")
+
+ val processors = machine.getList("cpus", Document::class.java).flatMap { cpu ->
+ val cores = cpu.getInteger("numberOfCores")
+ val speed = cpu.get("clockRateMhz", Number::class.java).toDouble()
+ // TODO Remove hardcoding of vendor
+ val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores)
+ List(cores) { coreId ->
+ ProcessingUnit(node, coreId, speed)
+ }
+ }
+ val memoryUnits = machine.getList("memories", Document::class.java).map { memory ->
+ MemoryUnit(
+ "Samsung",
+ memory.getString("name"),
+ memory.get("speedMbPerS", Number::class.java).toDouble(),
+ memory.get("sizeMb", Number::class.java).toLong()
+ )
+ }
+ nodes.add(
+ SimpleBareMetalDriver(
+ dom.newDomain(machineId),
+ UUID(random.nextLong(), random.nextLong()),
+ "node-$clusterId-$position",
+ mapOf(NODE_CLUSTER to clusterId),
+ processors,
+ memoryUnits,
+ // For now we assume a simple linear load model with an idle draw of ~200W and a maximum
+ // power draw of 350W.
+ // Source: https://stackoverflow.com/questions/6128960
+ LinearLoadPowerModel(200.0, 350.0)
+ )
+ )
+ }
+
+ val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner"))
+ dom.launch {
+ for (node in nodes) {
+ provisioningService.create(node)
+ }
+ }
+
+ val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
+
+ val platform = Platform(
+ UUID.randomUUID(), "opendc-platform", listOf(
+ Zone(UUID.randomUUID(), "zone", serviceRegistry)
+ )
+ )
+
+ return Environment(fetchName(id), null, listOf(platform))
+ }
+
+ override fun close() {}
+
+ /**
+ * Fetch the metadata of the topology.
+ */
+ private fun fetchName(id: String): String {
+ return collection.aggregate(
+ listOf(
+ Aggregates.match(Filters.eq("_id", id)),
+ Aggregates.project(Projections.include("name"))
+ )
+ )
+ .first()!!
+ .getString("name")
+ }
+
+ /**
+ * Fetch a topology from the database with the specified [id].
+ */
+ private fun fetchMachines(id: String): AggregateIterable<Document> {
+ return collection.aggregate(
+ listOf(
+ Aggregates.match(Filters.eq("_id", id)),
+ Aggregates.project(Projections.fields(Document("racks", "\$rooms.tiles.rack"))),
+ Aggregates.unwind("\$racks"),
+ Aggregates.unwind("\$racks"),
+ Aggregates.replaceRoot("\$racks"),
+ Aggregates.addFields(Field("machines.rack_id", "\$_id")),
+ Aggregates.unwind("\$machines"),
+ Aggregates.replaceRoot("\$machines")
+ )
+ )
+ }
+}
diff --git a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml
new file mode 100644
index 00000000..1d873554
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ MIT License
+ ~
+ ~ Copyright (c) 2020 atlarge-research
+ ~
+ ~ Permission is hereby granted, free of charge, to any person obtaining a copy
+ ~ of this software and associated documentation files (the "Software"), to deal
+ ~ in the Software without restriction, including without limitation the rights
+ ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ ~ copies of the Software, and to permit persons to whom the Software is
+ ~ furnished to do so, subject to the following conditions:
+ ~
+ ~ The above copyright notice and this permission notice shall be included in all
+ ~ copies or substantial portions of the Software.
+ ~
+ ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ ~ SOFTWARE.
+ -->
+
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false" />
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="com.atlarge.odcsim" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="com.atlarge.opendc" level="warn" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="com.atlarge.opendc.runner" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.apache.hadoop" level="warn" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.apache.spark" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Root level="error">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>