diff options
Diffstat (limited to 'simulator/opendc/opendc-runner-web')
6 files changed, 0 insertions, 870 deletions
diff --git a/simulator/opendc/opendc-runner-web/build.gradle.kts b/simulator/opendc/opendc-runner-web/build.gradle.kts deleted file mode 100644 index 479eaca7..00000000 --- a/simulator/opendc/opendc-runner-web/build.gradle.kts +++ /dev/null @@ -1,55 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Experiment runner for OpenDC" - -/* Build configuration */ -plugins { - `kotlin-library-convention` - application -} - -application { - mainClassName = "com.atlarge.opendc.runner.web.MainKt" -} - -dependencies { - api(project(":opendc:opendc-core")) - implementation(project(":opendc:opendc-compute")) - implementation(project(":opendc:opendc-format")) - implementation(project(":opendc:opendc-experiments-sc20")) - implementation(project(":opendc:opendc-simulator")) - - implementation("com.github.ajalt:clikt:2.8.0") - implementation("io.github.microutils:kotlin-logging:1.7.10") - - implementation("org.mongodb:mongodb-driver-sync:4.0.5") - implementation("org.apache.spark:spark-sql_2.12:3.0.0") { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - exclude(group = "log4j") - } - - runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") - runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:2.13.1") -} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt deleted file mode 100644 index ac4d9087..00000000 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ /dev/null @@ -1,346 +0,0 @@ -package com.atlarge.opendc.runner.web - -import com.atlarge.opendc.compute.virt.service.allocation.* -import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor -import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain -import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner -import com.atlarge.opendc.experiments.sc20.experiment.model.Workload -import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor -import com.atlarge.opendc.experiments.sc20.experiment.processTrace -import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader -import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader -import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.parameters.options.* -import com.github.ajalt.clikt.parameters.types.file -import com.github.ajalt.clikt.parameters.types.int -import com.mongodb.MongoClientSettings -import com.mongodb.MongoCredential -import com.mongodb.ServerAddress -import com.mongodb.client.MongoClients -import com.mongodb.client.MongoCollection -import com.mongodb.client.MongoDatabase -import com.mongodb.client.model.Filters -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.test.TestCoroutineScope -import mu.KotlinLogging -import org.bson.Document -import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.io.File -import java.util.* -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * Represents the CLI command for starting the OpenDC web runner. - */ -@OptIn(ExperimentalCoroutinesApi::class) -class RunnerCli : CliktCommand(name = "runner") { - /** - * The name of the database to use. - */ - private val mongoDb by option( - "--mongo-db", - help = "name of the database to use", - envvar = "OPENDC_DB" - ) - .default("opendc") - - /** - * The database host to connect to. - */ - private val mongoHost by option( - "--mongo-host", - help = "database host to connect to", - envvar = "OPENDC_DB_HOST" - ) - .default("localhost") - - /** - * The database port to connect to. - */ - private val mongoPort by option( - "--mongo-port", - help = "database port to connect to", - envvar = "OPENDC_DB_PORT" - ) - .int() - .default(27017) - - /** - * The database user to connect with. - */ - private val mongoUser by option( - "--mongo-user", - help = "database user to connect with", - envvar = "OPENDC_DB_USER" - ) - .default("opendc") - - /** - * The database password to connect with. - */ - private val mongoPassword by option( - "--mongo-password", - help = "database password to connect with", - envvar = "OPENDC_DB_PASSWORD" - ) - .convert { it.toCharArray() } - .required() - - /** - * The path to the traces directory. - */ - private val tracePath by option( - "--traces", - help = "path to the directory containing the traces", - envvar = "OPENDC_TRACES" - ) - .file(canBeFile = false) - .defaultLazy { File("traces/") } - - /** - * The path to the output directory. - */ - private val outputPath by option( - "--output", - help = "path to the results directory", - envvar = "OPENDC_OUTPUT" - ) - .file(canBeFile = false) - .defaultLazy { File("results/") } - - /** - * The Spark master to connect to. - */ - private val spark by option( - "--spark", - help = "Spark master to connect to", - envvar = "OPENDC_SPARK" - ) - .default("local[*]") - - /** - * Connect to the user-specified database. - */ - private fun createDatabase(): MongoDatabase { - val credential = MongoCredential.createScramSha1Credential( - mongoUser, - mongoDb, - mongoPassword - ) - - val settings = MongoClientSettings.builder() - .credential(credential) - .applyToClusterSettings { it.hosts(listOf(ServerAddress(mongoHost, mongoPort))) } - .build() - val client = MongoClients.create(settings) - return client.getDatabase(mongoDb) - } - - /** - * Run a single scenario. - */ - private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>) { - val id = scenario.getString("_id") - - logger.info { "Constructing performance interference model" } - - val traceDir = File( - tracePath, - scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) - ) - val traceReader = Sc20RawParquetTraceReader(traceDir) - val performanceInterferenceReader = let { - val path = File(traceDir, "performance-interference-model.json") - val operational = scenario.get("operational", Document::class.java) - val enabled = operational.getBoolean("performanceInterferenceEnabled") - - if (!enabled || !path.exists()) { - return@let null - } - - path.inputStream().use { Sc20PerformanceInterferenceReader(it) } - } - - val targets = portfolio.get("targets", Document::class.java) - - repeat(targets.getInteger("repeatsPerScenario")) { - logger.info { "Starting repeat $it" } - runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader) - } - - logger.info { "Finished simulation for scenario $id" } - } - - /** - * Run a single repeat. - */ - private suspend fun runRepeat( - scenario: Document, - repeat: Int, - topologies: MongoCollection<Document>, - traceReader: Sc20RawParquetTraceReader, - performanceInterferenceReader: Sc20PerformanceInterferenceReader? - ) { - val id = scenario.getString("_id") - val seed = repeat - val traceDocument = scenario.get("trace", Document::class.java) - val workloadName = traceDocument.getString("traceId") - val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() - - val seeder = Random(seed) - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - - val chan = Channel<Unit>(Channel.CONFLATED) - - val operational = scenario.get("operational", Document::class.java) - val allocationPolicy = - when (val policyName = operational.getString("schedulerName")) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - else -> throw IllegalArgumentException("Unknown policy $policyName") - } - - val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() - val trace = Sc20ParquetTraceReader( - listOf(traceReader), - performanceInterferenceModel, - Workload(workloadName, workloadFraction), - seed - ) - val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java) - val environment = TopologyParser(topologies, topologyId) - val monitor = ParquetExperimentMonitor( - outputPath, - "scenario_id=$id/run_id=$repeat", - 4096 - ) - - testScope.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner( - this, - clock, - environment, - allocationPolicy - ) - - val failureDomain = if (operational.getBoolean("failuresEnabled")) { - logger.debug("ENABLING failures") - createFailureDomain( - testScope, - clock, - seeder.nextInt(), - operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, - bareMetalProvisioner, - chan - ) - } else { - null - } - - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - trace, - scheduler, - chan, - monitor, - emptyMap() - ) - - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - } - - try { - testScope.advanceUntilIdle() - } finally { - monitor.close() - } - } - - val POLL_INTERVAL = 5000L // ms = 5 s - val HEARTBEAT_INTERVAL = 60000L // ms = 1 min - - override fun run() = runBlocking(Dispatchers.Default) { - logger.info { "Starting OpenDC web runner" } - logger.info { "Connecting to MongoDB instance" } - val database = createDatabase() - val manager = ScenarioManager(database.getCollection("scenarios")) - val portfolios = database.getCollection("portfolios") - val topologies = database.getCollection("topologies") - - logger.info { "Launching Spark" } - val resultProcessor = ResultProcessor(spark, outputPath) - - logger.info { "Watching for queued scenarios" } - - while (true) { - val scenario = manager.findNext() - - if (scenario == null) { - delay(POLL_INTERVAL) - continue - } - - val id = scenario.getString("_id") - - logger.info { "Found queued scenario $id: attempting to claim" } - - if (!manager.claim(id)) { - logger.info { "Failed to claim scenario" } - continue - } - - coroutineScope { - // Launch heartbeat process - val heartbeat = launch { - while (true) { - delay(HEARTBEAT_INTERVAL) - manager.heartbeat(id) - } - } - - try { - val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!! - runScenario(portfolio, scenario, topologies) - - logger.info { "Starting result processing" } - - val result = resultProcessor.process(id) - manager.finish(id, result) - - logger.info { "Successfully finished scenario $id" } - } catch (e: Exception) { - logger.warn(e) { "Scenario failed to finish" } - manager.fail(id) - } finally { - heartbeat.cancel() - } - } - } - } -} - -/** - * Main entry point of the runner. - */ -fun main(args: Array<String>) = RunnerCli().main(args) diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt deleted file mode 100644 index c0b0ac31..00000000 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt +++ /dev/null @@ -1,193 +0,0 @@ -package com.atlarge.opendc.runner.web - -import org.apache.spark.sql.Column -import org.apache.spark.sql.Dataset -import org.apache.spark.sql.Row -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.* -import java.io.File - -/** - * A helper class for processing the experiment results using Apache Spark. - */ -class ResultProcessor(private val master: String, private val outputPath: File) { - /** - * Process the results of the scenario with the given [id]. - */ - fun process(id: String): Result { - val spark = SparkSession.builder() - .master(master) - .appName("opendc-simulator-$id") - .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver - .orCreate - - try { - val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path) - val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path) - val res = aggregate(hostMetrics, provisionerMetrics).first() - - return Result( - res.getList<Long>(1), - res.getList<Long>(2), - res.getList<Long>(3), - res.getList<Long>(4), - res.getList<Double>(5), - res.getList<Double>(6), - res.getList<Double>(7), - res.getList<Int>(8), - res.getList<Long>(9), - res.getList<Long>(10), - res.getList<Long>(11), - res.getList<Int>(12), - res.getList<Int>(13), - res.getList<Int>(14), - res.getList<Int>(15) - ) - } finally { - spark.close() - } - } - - data class Result( - val totalRequestedBurst: List<Long>, - val totalGrantedBurst: List<Long>, - val totalOvercommittedBurst: List<Long>, - val totalInterferedBurst: List<Long>, - val meanCpuUsage: List<Double>, - val meanCpuDemand: List<Double>, - val meanNumDeployedImages: List<Double>, - val maxNumDeployedImages: List<Int>, - val totalPowerDraw: List<Long>, - val totalFailureSlices: List<Long>, - val totalFailureVmSlices: List<Long>, - val totalVmsSubmitted: List<Int>, - val totalVmsQueued: List<Int>, - val totalVmsFinished: List<Int>, - val totalVmsFailed: List<Int> - ) - - /** - * Perform aggregation of the experiment results. - */ - private fun aggregate(hostMetrics: Dataset<Row>, provisionerMetrics: Dataset<Row>): Dataset<Row> { - // Extrapolate the duration of the entries to span the entire trace - val hostMetricsExtra = hostMetrics - .withColumn("slice_counts", floor(col("duration") / lit(sliceLength))) - .withColumn("power_draw", col("power_draw") * col("slice_counts")) - .withColumn("state_int", states[col("state")]) - .withColumn("state_opposite_int", oppositeStates[col("state")]) - .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int")) - .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts")) - .withColumn("failure_slice_count", col("slice_counts") * col("state_int")) - .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count")) - - // Process all data in a single run - val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id") - - // Aggregate the summed total metrics - val systemMetrics = hostMetricsGrouped.agg( - sum("requested_burst").alias("total_requested_burst"), - sum("granted_burst").alias("total_granted_burst"), - sum("overcommissioned_burst").alias("total_overcommitted_burst"), - sum("interfered_burst").alias("total_interfered_burst"), - sum("power_draw").alias("total_power_draw"), - sum("failure_slice_count").alias("total_failure_slices"), - sum("failure_vm_slice_count").alias("total_failure_vm_slices") - ) - - // Aggregate metrics per host - val hvMetrics = hostMetrics - .groupBy("run_id", "host_id") - .agg( - sum("cpu_usage").alias("mean_cpu_usage"), - sum("cpu_demand").alias("mean_cpu_demand"), - avg("vm_count").alias("mean_num_deployed_images"), - count(lit(1)).alias("num_rows") - ) - .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows")) - .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows")) - .groupBy("run_id") - .agg( - avg("mean_cpu_usage").alias("mean_cpu_usage"), - avg("mean_cpu_demand").alias("mean_cpu_demand"), - avg("mean_num_deployed_images").alias("mean_num_deployed_images"), - max("mean_num_deployed_images").alias("max_num_deployed_images") - ) - - // Group the provisioner metrics per run - val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id") - - // Aggregate the provisioner metrics - val provisionerMetricsAggregated = provisionerMetricsGrouped.agg( - max("vm_total_count").alias("total_vms_submitted"), - max("vm_waiting_count").alias("total_vms_queued"), - max("vm_active_count").alias("total_vms_running"), - max("vm_inactive_count").alias("total_vms_finished"), - max("vm_failed_count").alias("total_vms_failed") - ) - - // Join the results into a single data frame - return systemMetrics - .join(hvMetrics, "run_id") - .join(provisionerMetricsAggregated, "run_id") - .select( - col("total_requested_burst"), - col("total_granted_burst"), - col("total_overcommitted_burst"), - col("total_interfered_burst"), - col("mean_cpu_usage"), - col("mean_cpu_demand"), - col("mean_num_deployed_images"), - col("max_num_deployed_images"), - col("total_power_draw"), - col("total_failure_slices"), - col("total_failure_vm_slices"), - col("total_vms_submitted"), - col("total_vms_queued"), - col("total_vms_finished"), - col("total_vms_failed") - ) - .groupBy(lit(1)) - .agg( - // TODO Check if order of values is correct - collect_list(col("total_requested_burst")).alias("total_requested_burst"), - collect_list(col("total_granted_burst")).alias("total_granted_burst"), - collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"), - collect_list(col("total_interfered_burst")).alias("total_interfered_burst"), - collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"), - collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"), - collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"), - collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"), - collect_list(col("total_power_draw")).alias("total_power_draw"), - collect_list(col("total_failure_slices")).alias("total_failure_slices"), - collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"), - collect_list(col("total_vms_submitted")).alias("total_vms_submitted"), - collect_list(col("total_vms_queued")).alias("total_vms_queued"), - collect_list(col("total_vms_finished")).alias("total_vms_finished"), - collect_list(col("total_vms_failed")).alias("total_vms_failed") - ) - } - - // Spark helper functions - operator fun Column.times(other: Column): Column = `$times`(other) - operator fun Column.div(other: Column): Column = `$div`(other) - operator fun Column.get(other: Column): Column = this.apply(other) - - val sliceLength = 5 * 60 * 1000 - val states = map( - lit("ERROR"), - lit(1), - lit("ACTIVE"), - lit(0), - lit("SHUTOFF"), - lit(0) - ) - val oppositeStates = map( - lit("ERROR"), - lit(0), - lit("ACTIVE"), - lit(1), - lit("SHUTOFF"), - lit(1) - ) -} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt deleted file mode 100644 index 6ec4995d..00000000 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt +++ /dev/null @@ -1,93 +0,0 @@ -package com.atlarge.opendc.runner.web - -import com.mongodb.client.MongoCollection -import com.mongodb.client.model.Filters -import com.mongodb.client.model.Updates -import org.bson.Document -import java.time.Instant - -/** - * Manages the queue of scenarios that need to be processed. - */ -class ScenarioManager(private val collection: MongoCollection<Document>) { - /** - * Find the next scenario that the simulator needs to process. - */ - fun findNext(): Document? { - return collection - .find(Filters.eq("simulation.state", "QUEUED")) - .first() - } - - /** - * Claim the scenario in the database with the specified id. - */ - fun claim(id: String): Boolean { - val res = collection.findOneAndUpdate( - Filters.and( - Filters.eq("_id", id), - Filters.eq("simulation.state", "QUEUED") - ), - Updates.combine( - Updates.set("simulation.state", "RUNNING"), - Updates.set("simulation.heartbeat", Instant.now()) - ) - ) - return res != null - } - - /** - * Update the heartbeat of the specified scenario. - */ - fun heartbeat(id: String) { - collection.findOneAndUpdate( - Filters.and( - Filters.eq("_id", id), - Filters.eq("simulation.state", "RUNNING") - ), - Updates.set("simulation.heartbeat", Instant.now()) - ) - } - - /** - * Mark the scenario as failed. - */ - fun fail(id: String) { - collection.findOneAndUpdate( - Filters.eq("_id", id), - Updates.combine( - Updates.set("simulation.state", "FAILED"), - Updates.set("simulation.heartbeat", Instant.now()) - ) - ) - } - - /** - * Persist the specified results. - */ - fun finish(id: String, result: ResultProcessor.Result) { - collection.findOneAndUpdate( - Filters.eq("_id", id), - Updates.combine( - Updates.set("simulation.state", "FINISHED"), - Updates.unset("simulation.time"), - Updates.set("results.total_requested_burst", result.totalRequestedBurst), - Updates.set("results.total_granted_burst", result.totalGrantedBurst), - Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst), - Updates.set("results.total_interfered_burst", result.totalInterferedBurst), - Updates.set("results.mean_cpu_usage", result.meanCpuUsage), - Updates.set("results.mean_cpu_demand", result.meanCpuDemand), - Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages), - Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), - Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), - Updates.set("results.total_power_draw", result.totalPowerDraw), - Updates.set("results.total_failure_slices", result.totalFailureSlices), - Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices), - Updates.set("results.total_vms_submitted", result.totalVmsSubmitted), - Updates.set("results.total_vms_queued", result.totalVmsQueued), - Updates.set("results.total_vms_finished", result.totalVmsFinished), - Updates.set("results.total_vms_failed", result.totalVmsFailed) - ) - ) - } -} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt deleted file mode 100644 index f9b1c6c4..00000000 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt +++ /dev/null @@ -1,131 +0,0 @@ -package com.atlarge.opendc.runner.web - -import com.atlarge.opendc.compute.core.MemoryUnit -import com.atlarge.opendc.compute.core.ProcessingNode -import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.metal.NODE_CLUSTER -import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver -import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel -import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService -import com.atlarge.opendc.core.Environment -import com.atlarge.opendc.core.Platform -import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.services.ServiceRegistry -import com.atlarge.opendc.format.environment.EnvironmentReader -import com.mongodb.client.AggregateIterable -import com.mongodb.client.MongoCollection -import com.mongodb.client.model.Aggregates -import com.mongodb.client.model.Field -import com.mongodb.client.model.Filters -import com.mongodb.client.model.Projections -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.launch -import org.bson.Document -import java.time.Clock -import java.util.* - -/** - * A helper class that converts the MongoDB topology into an OpenDC environment. - */ -class TopologyParser(private val collection: MongoCollection<Document>, private val id: String) : EnvironmentReader { - /** - * Parse the topology with the specified [id]. - */ - override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { - val nodes = mutableListOf<SimpleBareMetalDriver>() - val random = Random(0) - - for (machine in fetchMachines(id)) { - val machineId = machine.getString("_id") - val clusterId = machine.getString("rack_id") - val position = machine.getInteger("position") - - val processors = machine.getList("cpus", Document::class.java).flatMap { cpu -> - val cores = cpu.getInteger("numberOfCores") - val speed = cpu.get("clockRateMhz", Number::class.java).toDouble() - // TODO Remove hardcoding of vendor - val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores) - List(cores) { coreId -> - ProcessingUnit(node, coreId, speed) - } - } - val memoryUnits = machine.getList("memories", Document::class.java).map { memory -> - MemoryUnit( - "Samsung", - memory.getString("name"), - memory.get("speedMbPerS", Number::class.java).toDouble(), - memory.get("sizeMb", Number::class.java).toLong() - ) - } - nodes.add( - SimpleBareMetalDriver( - coroutineScope, - clock, - UUID(random.nextLong(), random.nextLong()), - "node-$clusterId-$position", - mapOf(NODE_CLUSTER to clusterId), - processors, - memoryUnits, - // For now we assume a simple linear load model with an idle draw of ~200W and a maximum - // power draw of 350W. - // Source: https://stackoverflow.com/questions/6128960 - LinearLoadPowerModel(200.0, 350.0) - ) - ) - } - - val provisioningService = SimpleProvisioningService() - coroutineScope.launch { - for (node in nodes) { - provisioningService.create(node) - } - } - - val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) - - val platform = Platform( - UUID.randomUUID(), - "opendc-platform", - listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - ) - ) - - return Environment(fetchName(id), null, listOf(platform)) - } - - override fun close() {} - - /** - * Fetch the metadata of the topology. - */ - private fun fetchName(id: String): String { - return collection.aggregate( - listOf( - Aggregates.match(Filters.eq("_id", id)), - Aggregates.project(Projections.include("name")) - ) - ) - .first()!! - .getString("name") - } - - /** - * Fetch a topology from the database with the specified [id]. - */ - private fun fetchMachines(id: String): AggregateIterable<Document> { - return collection.aggregate( - listOf( - Aggregates.match(Filters.eq("_id", id)), - Aggregates.project(Projections.fields(Document("racks", "\$rooms.tiles.rack"))), - Aggregates.unwind("\$racks"), - Aggregates.unwind("\$racks"), - Aggregates.replaceRoot("\$racks"), - Aggregates.addFields(Field("machines.rack_id", "\$_id")), - Aggregates.unwind("\$machines"), - Aggregates.replaceRoot("\$machines") - ) - ) - } -} diff --git a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml deleted file mode 100644 index 1d873554..00000000 --- a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml +++ /dev/null @@ -1,52 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ MIT License - ~ - ~ Copyright (c) 2020 atlarge-research - ~ - ~ Permission is hereby granted, free of charge, to any person obtaining a copy - ~ of this software and associated documentation files (the "Software"), to deal - ~ in the Software without restriction, including without limitation the rights - ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - ~ copies of the Software, and to permit persons to whom the Software is - ~ furnished to do so, subject to the following conditions: - ~ - ~ The above copyright notice and this permission notice shall be included in all - ~ copies or substantial portions of the Software. - ~ - ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - ~ SOFTWARE. - --> - -<Configuration status="WARN"> - <Appenders> - <Console name="Console" target="SYSTEM_OUT"> - <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false" /> - </Console> - </Appenders> - <Loggers> - <Logger name="com.atlarge.odcsim" level="info" additivity="false"> - <AppenderRef ref="Console"/> - </Logger> - <Logger name="com.atlarge.opendc" level="warn" additivity="false"> - <AppenderRef ref="Console"/> - </Logger> - <Logger name="com.atlarge.opendc.runner" level="info" additivity="false"> - <AppenderRef ref="Console"/> - </Logger> - <Logger name="org.apache.hadoop" level="warn" additivity="false"> - <AppenderRef ref="Console"/> - </Logger> - <Logger name="org.apache.spark" level="info" additivity="false"> - <AppenderRef ref="Console"/> - </Logger> - <Root level="error"> - <AppenderRef ref="Console"/> - </Root> - </Loggers> -</Configuration> |
