diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-10-01 00:49:53 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-10-01 10:19:13 +0200 |
| commit | 8a9f5573bef3f68316add17c04a47cc4e5fe75fa (patch) | |
| tree | 8dff00c904d5db65e9baf0e7ca2d18a8a61800c1 /simulator/opendc-runner-web/src | |
| parent | 27ddd462d148d70760e45f967387905054e21d20 (diff) | |
Move OpenDC modules into simulator root
This change moves the OpenDC modules previously living in the simulator/opendc
directory to the simulator directory itself given that we do not make a
distinction between OpenDC and odcsim anymore.
Diffstat (limited to 'simulator/opendc-runner-web/src')
5 files changed, 815 insertions, 0 deletions
diff --git a/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt new file mode 100644 index 00000000..ac4d9087 --- /dev/null +++ b/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -0,0 +1,346 @@ +package com.atlarge.opendc.runner.web + +import com.atlarge.opendc.compute.virt.service.allocation.* +import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor +import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain +import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload +import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor +import com.atlarge.opendc.experiments.sc20.experiment.processTrace +import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader +import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader +import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.options.* +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.int +import com.mongodb.MongoClientSettings +import com.mongodb.MongoCredential +import com.mongodb.ServerAddress +import com.mongodb.client.MongoClients +import com.mongodb.client.MongoCollection +import com.mongodb.client.MongoDatabase +import com.mongodb.client.model.Filters +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.test.TestCoroutineScope +import mu.KotlinLogging +import org.bson.Document +import org.opendc.simulator.utils.DelayControllerClockAdapter +import java.io.File +import java.util.* +import kotlin.random.Random + +private val logger = KotlinLogging.logger {} + +/** + * Represents the CLI command for starting the OpenDC web runner. + */ +@OptIn(ExperimentalCoroutinesApi::class) +class RunnerCli : CliktCommand(name = "runner") { + /** + * The name of the database to use. + */ + private val mongoDb by option( + "--mongo-db", + help = "name of the database to use", + envvar = "OPENDC_DB" + ) + .default("opendc") + + /** + * The database host to connect to. + */ + private val mongoHost by option( + "--mongo-host", + help = "database host to connect to", + envvar = "OPENDC_DB_HOST" + ) + .default("localhost") + + /** + * The database port to connect to. + */ + private val mongoPort by option( + "--mongo-port", + help = "database port to connect to", + envvar = "OPENDC_DB_PORT" + ) + .int() + .default(27017) + + /** + * The database user to connect with. + */ + private val mongoUser by option( + "--mongo-user", + help = "database user to connect with", + envvar = "OPENDC_DB_USER" + ) + .default("opendc") + + /** + * The database password to connect with. + */ + private val mongoPassword by option( + "--mongo-password", + help = "database password to connect with", + envvar = "OPENDC_DB_PASSWORD" + ) + .convert { it.toCharArray() } + .required() + + /** + * The path to the traces directory. + */ + private val tracePath by option( + "--traces", + help = "path to the directory containing the traces", + envvar = "OPENDC_TRACES" + ) + .file(canBeFile = false) + .defaultLazy { File("traces/") } + + /** + * The path to the output directory. + */ + private val outputPath by option( + "--output", + help = "path to the results directory", + envvar = "OPENDC_OUTPUT" + ) + .file(canBeFile = false) + .defaultLazy { File("results/") } + + /** + * The Spark master to connect to. + */ + private val spark by option( + "--spark", + help = "Spark master to connect to", + envvar = "OPENDC_SPARK" + ) + .default("local[*]") + + /** + * Connect to the user-specified database. + */ + private fun createDatabase(): MongoDatabase { + val credential = MongoCredential.createScramSha1Credential( + mongoUser, + mongoDb, + mongoPassword + ) + + val settings = MongoClientSettings.builder() + .credential(credential) + .applyToClusterSettings { it.hosts(listOf(ServerAddress(mongoHost, mongoPort))) } + .build() + val client = MongoClients.create(settings) + return client.getDatabase(mongoDb) + } + + /** + * Run a single scenario. + */ + private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>) { + val id = scenario.getString("_id") + + logger.info { "Constructing performance interference model" } + + val traceDir = File( + tracePath, + scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) + ) + val traceReader = Sc20RawParquetTraceReader(traceDir) + val performanceInterferenceReader = let { + val path = File(traceDir, "performance-interference-model.json") + val operational = scenario.get("operational", Document::class.java) + val enabled = operational.getBoolean("performanceInterferenceEnabled") + + if (!enabled || !path.exists()) { + return@let null + } + + path.inputStream().use { Sc20PerformanceInterferenceReader(it) } + } + + val targets = portfolio.get("targets", Document::class.java) + + repeat(targets.getInteger("repeatsPerScenario")) { + logger.info { "Starting repeat $it" } + runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader) + } + + logger.info { "Finished simulation for scenario $id" } + } + + /** + * Run a single repeat. + */ + private suspend fun runRepeat( + scenario: Document, + repeat: Int, + topologies: MongoCollection<Document>, + traceReader: Sc20RawParquetTraceReader, + performanceInterferenceReader: Sc20PerformanceInterferenceReader? + ) { + val id = scenario.getString("_id") + val seed = repeat + val traceDocument = scenario.get("trace", Document::class.java) + val workloadName = traceDocument.getString("traceId") + val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() + + val seeder = Random(seed) + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) + + val chan = Channel<Unit>(Channel.CONFLATED) + + val operational = scenario.get("operational", Document::class.java) + val allocationPolicy = + when (val policyName = operational.getString("schedulerName")) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + else -> throw IllegalArgumentException("Unknown policy $policyName") + } + + val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() + val trace = Sc20ParquetTraceReader( + listOf(traceReader), + performanceInterferenceModel, + Workload(workloadName, workloadFraction), + seed + ) + val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java) + val environment = TopologyParser(topologies, topologyId) + val monitor = ParquetExperimentMonitor( + outputPath, + "scenario_id=$id/run_id=$repeat", + 4096 + ) + + testScope.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner( + this, + clock, + environment, + allocationPolicy + ) + + val failureDomain = if (operational.getBoolean("failuresEnabled")) { + logger.debug("ENABLING failures") + createFailureDomain( + testScope, + clock, + seeder.nextInt(), + operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, + bareMetalProvisioner, + chan + ) + } else { + null + } + + attachMonitor(this, clock, scheduler, monitor) + processTrace( + this, + clock, + trace, + scheduler, + chan, + monitor, + emptyMap() + ) + + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + try { + testScope.advanceUntilIdle() + } finally { + monitor.close() + } + } + + val POLL_INTERVAL = 5000L // ms = 5 s + val HEARTBEAT_INTERVAL = 60000L // ms = 1 min + + override fun run() = runBlocking(Dispatchers.Default) { + logger.info { "Starting OpenDC web runner" } + logger.info { "Connecting to MongoDB instance" } + val database = createDatabase() + val manager = ScenarioManager(database.getCollection("scenarios")) + val portfolios = database.getCollection("portfolios") + val topologies = database.getCollection("topologies") + + logger.info { "Launching Spark" } + val resultProcessor = ResultProcessor(spark, outputPath) + + logger.info { "Watching for queued scenarios" } + + while (true) { + val scenario = manager.findNext() + + if (scenario == null) { + delay(POLL_INTERVAL) + continue + } + + val id = scenario.getString("_id") + + logger.info { "Found queued scenario $id: attempting to claim" } + + if (!manager.claim(id)) { + logger.info { "Failed to claim scenario" } + continue + } + + coroutineScope { + // Launch heartbeat process + val heartbeat = launch { + while (true) { + delay(HEARTBEAT_INTERVAL) + manager.heartbeat(id) + } + } + + try { + val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!! + runScenario(portfolio, scenario, topologies) + + logger.info { "Starting result processing" } + + val result = resultProcessor.process(id) + manager.finish(id, result) + + logger.info { "Successfully finished scenario $id" } + } catch (e: Exception) { + logger.warn(e) { "Scenario failed to finish" } + manager.fail(id) + } finally { + heartbeat.cancel() + } + } + } + } +} + +/** + * Main entry point of the runner. + */ +fun main(args: Array<String>) = RunnerCli().main(args) diff --git a/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt new file mode 100644 index 00000000..c0b0ac31 --- /dev/null +++ b/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt @@ -0,0 +1,193 @@ +package com.atlarge.opendc.runner.web + +import org.apache.spark.sql.Column +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.* +import java.io.File + +/** + * A helper class for processing the experiment results using Apache Spark. + */ +class ResultProcessor(private val master: String, private val outputPath: File) { + /** + * Process the results of the scenario with the given [id]. + */ + fun process(id: String): Result { + val spark = SparkSession.builder() + .master(master) + .appName("opendc-simulator-$id") + .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver + .orCreate + + try { + val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path) + val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path) + val res = aggregate(hostMetrics, provisionerMetrics).first() + + return Result( + res.getList<Long>(1), + res.getList<Long>(2), + res.getList<Long>(3), + res.getList<Long>(4), + res.getList<Double>(5), + res.getList<Double>(6), + res.getList<Double>(7), + res.getList<Int>(8), + res.getList<Long>(9), + res.getList<Long>(10), + res.getList<Long>(11), + res.getList<Int>(12), + res.getList<Int>(13), + res.getList<Int>(14), + res.getList<Int>(15) + ) + } finally { + spark.close() + } + } + + data class Result( + val totalRequestedBurst: List<Long>, + val totalGrantedBurst: List<Long>, + val totalOvercommittedBurst: List<Long>, + val totalInterferedBurst: List<Long>, + val meanCpuUsage: List<Double>, + val meanCpuDemand: List<Double>, + val meanNumDeployedImages: List<Double>, + val maxNumDeployedImages: List<Int>, + val totalPowerDraw: List<Long>, + val totalFailureSlices: List<Long>, + val totalFailureVmSlices: List<Long>, + val totalVmsSubmitted: List<Int>, + val totalVmsQueued: List<Int>, + val totalVmsFinished: List<Int>, + val totalVmsFailed: List<Int> + ) + + /** + * Perform aggregation of the experiment results. + */ + private fun aggregate(hostMetrics: Dataset<Row>, provisionerMetrics: Dataset<Row>): Dataset<Row> { + // Extrapolate the duration of the entries to span the entire trace + val hostMetricsExtra = hostMetrics + .withColumn("slice_counts", floor(col("duration") / lit(sliceLength))) + .withColumn("power_draw", col("power_draw") * col("slice_counts")) + .withColumn("state_int", states[col("state")]) + .withColumn("state_opposite_int", oppositeStates[col("state")]) + .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int")) + .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts")) + .withColumn("failure_slice_count", col("slice_counts") * col("state_int")) + .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count")) + + // Process all data in a single run + val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id") + + // Aggregate the summed total metrics + val systemMetrics = hostMetricsGrouped.agg( + sum("requested_burst").alias("total_requested_burst"), + sum("granted_burst").alias("total_granted_burst"), + sum("overcommissioned_burst").alias("total_overcommitted_burst"), + sum("interfered_burst").alias("total_interfered_burst"), + sum("power_draw").alias("total_power_draw"), + sum("failure_slice_count").alias("total_failure_slices"), + sum("failure_vm_slice_count").alias("total_failure_vm_slices") + ) + + // Aggregate metrics per host + val hvMetrics = hostMetrics + .groupBy("run_id", "host_id") + .agg( + sum("cpu_usage").alias("mean_cpu_usage"), + sum("cpu_demand").alias("mean_cpu_demand"), + avg("vm_count").alias("mean_num_deployed_images"), + count(lit(1)).alias("num_rows") + ) + .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows")) + .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows")) + .groupBy("run_id") + .agg( + avg("mean_cpu_usage").alias("mean_cpu_usage"), + avg("mean_cpu_demand").alias("mean_cpu_demand"), + avg("mean_num_deployed_images").alias("mean_num_deployed_images"), + max("mean_num_deployed_images").alias("max_num_deployed_images") + ) + + // Group the provisioner metrics per run + val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id") + + // Aggregate the provisioner metrics + val provisionerMetricsAggregated = provisionerMetricsGrouped.agg( + max("vm_total_count").alias("total_vms_submitted"), + max("vm_waiting_count").alias("total_vms_queued"), + max("vm_active_count").alias("total_vms_running"), + max("vm_inactive_count").alias("total_vms_finished"), + max("vm_failed_count").alias("total_vms_failed") + ) + + // Join the results into a single data frame + return systemMetrics + .join(hvMetrics, "run_id") + .join(provisionerMetricsAggregated, "run_id") + .select( + col("total_requested_burst"), + col("total_granted_burst"), + col("total_overcommitted_burst"), + col("total_interfered_burst"), + col("mean_cpu_usage"), + col("mean_cpu_demand"), + col("mean_num_deployed_images"), + col("max_num_deployed_images"), + col("total_power_draw"), + col("total_failure_slices"), + col("total_failure_vm_slices"), + col("total_vms_submitted"), + col("total_vms_queued"), + col("total_vms_finished"), + col("total_vms_failed") + ) + .groupBy(lit(1)) + .agg( + // TODO Check if order of values is correct + collect_list(col("total_requested_burst")).alias("total_requested_burst"), + collect_list(col("total_granted_burst")).alias("total_granted_burst"), + collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"), + collect_list(col("total_interfered_burst")).alias("total_interfered_burst"), + collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"), + collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"), + collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"), + collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"), + collect_list(col("total_power_draw")).alias("total_power_draw"), + collect_list(col("total_failure_slices")).alias("total_failure_slices"), + collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"), + collect_list(col("total_vms_submitted")).alias("total_vms_submitted"), + collect_list(col("total_vms_queued")).alias("total_vms_queued"), + collect_list(col("total_vms_finished")).alias("total_vms_finished"), + collect_list(col("total_vms_failed")).alias("total_vms_failed") + ) + } + + // Spark helper functions + operator fun Column.times(other: Column): Column = `$times`(other) + operator fun Column.div(other: Column): Column = `$div`(other) + operator fun Column.get(other: Column): Column = this.apply(other) + + val sliceLength = 5 * 60 * 1000 + val states = map( + lit("ERROR"), + lit(1), + lit("ACTIVE"), + lit(0), + lit("SHUTOFF"), + lit(0) + ) + val oppositeStates = map( + lit("ERROR"), + lit(0), + lit("ACTIVE"), + lit(1), + lit("SHUTOFF"), + lit(1) + ) +} diff --git a/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt new file mode 100644 index 00000000..6ec4995d --- /dev/null +++ b/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt @@ -0,0 +1,93 @@ +package com.atlarge.opendc.runner.web + +import com.mongodb.client.MongoCollection +import com.mongodb.client.model.Filters +import com.mongodb.client.model.Updates +import org.bson.Document +import java.time.Instant + +/** + * Manages the queue of scenarios that need to be processed. + */ +class ScenarioManager(private val collection: MongoCollection<Document>) { + /** + * Find the next scenario that the simulator needs to process. + */ + fun findNext(): Document? { + return collection + .find(Filters.eq("simulation.state", "QUEUED")) + .first() + } + + /** + * Claim the scenario in the database with the specified id. + */ + fun claim(id: String): Boolean { + val res = collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "QUEUED") + ), + Updates.combine( + Updates.set("simulation.state", "RUNNING"), + Updates.set("simulation.heartbeat", Instant.now()) + ) + ) + return res != null + } + + /** + * Update the heartbeat of the specified scenario. + */ + fun heartbeat(id: String) { + collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "RUNNING") + ), + Updates.set("simulation.heartbeat", Instant.now()) + ) + } + + /** + * Mark the scenario as failed. + */ + fun fail(id: String) { + collection.findOneAndUpdate( + Filters.eq("_id", id), + Updates.combine( + Updates.set("simulation.state", "FAILED"), + Updates.set("simulation.heartbeat", Instant.now()) + ) + ) + } + + /** + * Persist the specified results. + */ + fun finish(id: String, result: ResultProcessor.Result) { + collection.findOneAndUpdate( + Filters.eq("_id", id), + Updates.combine( + Updates.set("simulation.state", "FINISHED"), + Updates.unset("simulation.time"), + Updates.set("results.total_requested_burst", result.totalRequestedBurst), + Updates.set("results.total_granted_burst", result.totalGrantedBurst), + Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst), + Updates.set("results.total_interfered_burst", result.totalInterferedBurst), + Updates.set("results.mean_cpu_usage", result.meanCpuUsage), + Updates.set("results.mean_cpu_demand", result.meanCpuDemand), + Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages), + Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), + Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), + Updates.set("results.total_power_draw", result.totalPowerDraw), + Updates.set("results.total_failure_slices", result.totalFailureSlices), + Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices), + Updates.set("results.total_vms_submitted", result.totalVmsSubmitted), + Updates.set("results.total_vms_queued", result.totalVmsQueued), + Updates.set("results.total_vms_finished", result.totalVmsFinished), + Updates.set("results.total_vms_failed", result.totalVmsFailed) + ) + ) + } +} diff --git a/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt new file mode 100644 index 00000000..f9b1c6c4 --- /dev/null +++ b/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt @@ -0,0 +1,131 @@ +package com.atlarge.opendc.runner.web + +import com.atlarge.opendc.compute.core.MemoryUnit +import com.atlarge.opendc.compute.core.ProcessingNode +import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService +import com.atlarge.opendc.core.Environment +import com.atlarge.opendc.core.Platform +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.services.ServiceRegistry +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.mongodb.client.AggregateIterable +import com.mongodb.client.MongoCollection +import com.mongodb.client.model.Aggregates +import com.mongodb.client.model.Field +import com.mongodb.client.model.Filters +import com.mongodb.client.model.Projections +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.launch +import org.bson.Document +import java.time.Clock +import java.util.* + +/** + * A helper class that converts the MongoDB topology into an OpenDC environment. + */ +class TopologyParser(private val collection: MongoCollection<Document>, private val id: String) : EnvironmentReader { + /** + * Parse the topology with the specified [id]. + */ + override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { + val nodes = mutableListOf<SimpleBareMetalDriver>() + val random = Random(0) + + for (machine in fetchMachines(id)) { + val machineId = machine.getString("_id") + val clusterId = machine.getString("rack_id") + val position = machine.getInteger("position") + + val processors = machine.getList("cpus", Document::class.java).flatMap { cpu -> + val cores = cpu.getInteger("numberOfCores") + val speed = cpu.get("clockRateMhz", Number::class.java).toDouble() + // TODO Remove hardcoding of vendor + val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores) + List(cores) { coreId -> + ProcessingUnit(node, coreId, speed) + } + } + val memoryUnits = machine.getList("memories", Document::class.java).map { memory -> + MemoryUnit( + "Samsung", + memory.getString("name"), + memory.get("speedMbPerS", Number::class.java).toDouble(), + memory.get("sizeMb", Number::class.java).toLong() + ) + } + nodes.add( + SimpleBareMetalDriver( + coroutineScope, + clock, + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$position", + mapOf(NODE_CLUSTER to clusterId), + processors, + memoryUnits, + // For now we assume a simple linear load model with an idle draw of ~200W and a maximum + // power draw of 350W. + // Source: https://stackoverflow.com/questions/6128960 + LinearLoadPowerModel(200.0, 350.0) + ) + ) + } + + val provisioningService = SimpleProvisioningService() + coroutineScope.launch { + for (node in nodes) { + provisioningService.create(node) + } + } + + val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) + + val platform = Platform( + UUID.randomUUID(), + "opendc-platform", + listOf( + Zone(UUID.randomUUID(), "zone", serviceRegistry) + ) + ) + + return Environment(fetchName(id), null, listOf(platform)) + } + + override fun close() {} + + /** + * Fetch the metadata of the topology. + */ + private fun fetchName(id: String): String { + return collection.aggregate( + listOf( + Aggregates.match(Filters.eq("_id", id)), + Aggregates.project(Projections.include("name")) + ) + ) + .first()!! + .getString("name") + } + + /** + * Fetch a topology from the database with the specified [id]. + */ + private fun fetchMachines(id: String): AggregateIterable<Document> { + return collection.aggregate( + listOf( + Aggregates.match(Filters.eq("_id", id)), + Aggregates.project(Projections.fields(Document("racks", "\$rooms.tiles.rack"))), + Aggregates.unwind("\$racks"), + Aggregates.unwind("\$racks"), + Aggregates.replaceRoot("\$racks"), + Aggregates.addFields(Field("machines.rack_id", "\$_id")), + Aggregates.unwind("\$machines"), + Aggregates.replaceRoot("\$machines") + ) + ) + } +} diff --git a/simulator/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc-runner-web/src/main/resources/log4j2.xml new file mode 100644 index 00000000..1d873554 --- /dev/null +++ b/simulator/opendc-runner-web/src/main/resources/log4j2.xml @@ -0,0 +1,52 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ MIT License + ~ + ~ Copyright (c) 2020 atlarge-research + ~ + ~ Permission is hereby granted, free of charge, to any person obtaining a copy + ~ of this software and associated documentation files (the "Software"), to deal + ~ in the Software without restriction, including without limitation the rights + ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + ~ copies of the Software, and to permit persons to whom the Software is + ~ furnished to do so, subject to the following conditions: + ~ + ~ The above copyright notice and this permission notice shall be included in all + ~ copies or substantial portions of the Software. + ~ + ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + ~ SOFTWARE. + --> + +<Configuration status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false" /> + </Console> + </Appenders> + <Loggers> + <Logger name="com.atlarge.odcsim" level="info" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Logger name="com.atlarge.opendc" level="warn" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Logger name="com.atlarge.opendc.runner" level="info" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Logger name="org.apache.hadoop" level="warn" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Logger name="org.apache.spark" level="info" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Root level="error"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> |
