summaryrefslogtreecommitdiff
path: root/simulator/opendc/opendc-runner-web/src
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc/opendc-runner-web/src')
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt345
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt187
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt93
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt127
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml52
5 files changed, 0 insertions, 804 deletions
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
deleted file mode 100644
index 807c119e..00000000
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
+++ /dev/null
@@ -1,345 +0,0 @@
-package com.atlarge.opendc.runner.web
-
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.opendc.compute.virt.service.allocation.*
-import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor
-import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain
-import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner
-import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
-import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor
-import com.atlarge.opendc.experiments.sc20.experiment.processTrace
-import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
-import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
-import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
-import com.github.ajalt.clikt.core.CliktCommand
-import com.github.ajalt.clikt.parameters.options.*
-import com.github.ajalt.clikt.parameters.types.file
-import com.github.ajalt.clikt.parameters.types.int
-import com.mongodb.MongoClientSettings
-import com.mongodb.MongoCredential
-import com.mongodb.ServerAddress
-import com.mongodb.client.MongoClients
-import com.mongodb.client.MongoCollection
-import com.mongodb.client.MongoDatabase
-import com.mongodb.client.model.Filters
-import java.io.File
-import java.util.*
-import kotlin.random.Random
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
-import mu.KotlinLogging
-import org.bson.Document
-
-private val logger = KotlinLogging.logger {}
-
-/**
- * The provider for the simulation engine to use.
- */
-private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
-
-/**
- * Represents the CLI command for starting the OpenDC web runner.
- */
-class RunnerCli : CliktCommand(name = "runner") {
- /**
- * The name of the database to use.
- */
- private val mongoDb by option(
- "--mongo-db",
- help = "name of the database to use",
- envvar = "OPENDC_DB"
- )
- .default("opendc")
-
- /**
- * The database host to connect to.
- */
- private val mongoHost by option(
- "--mongo-host",
- help = "database host to connect to",
- envvar = "OPENDC_DB_HOST"
- )
- .default("localhost")
-
- /**
- * The database port to connect to.
- */
- private val mongoPort by option(
- "--mongo-port",
- help = "database port to connect to",
- envvar = "OPENDC_DB_PORT"
- )
- .int()
- .default(27017)
-
- /**
- * The database user to connect with.
- */
- private val mongoUser by option(
- "--mongo-user",
- help = "database user to connect with",
- envvar = "OPENDC_DB_USER"
- )
- .default("opendc")
-
- /**
- * The database password to connect with.
- */
- private val mongoPassword by option(
- "--mongo-password",
- help = "database password to connect with",
- envvar = "OPENDC_DB_PASSWORD"
- )
- .convert { it.toCharArray() }
- .required()
-
- /**
- * The path to the traces directory.
- */
- private val tracePath by option(
- "--traces",
- help = "path to the directory containing the traces",
- envvar = "OPENDC_TRACES"
- )
- .file(canBeFile = false)
- .defaultLazy { File("traces/") }
-
- /**
- * The path to the output directory.
- */
- private val outputPath by option(
- "--output",
- help = "path to the results directory",
- envvar = "OPENDC_OUTPUT"
- )
- .file(canBeFile = false)
- .defaultLazy { File("results/") }
-
- /**
- * The Spark master to connect to.
- */
- private val spark by option(
- "--spark",
- help = "Spark master to connect to",
- envvar = "OPENDC_SPARK"
- )
- .default("local[*]")
-
- /**
- * Connect to the user-specified database.
- */
- private fun createDatabase(): MongoDatabase {
- val credential = MongoCredential.createScramSha1Credential(
- mongoUser,
- mongoDb,
- mongoPassword
- )
-
- val settings = MongoClientSettings.builder()
- .credential(credential)
- .applyToClusterSettings { it.hosts(listOf(ServerAddress(mongoHost, mongoPort))) }
- .build()
- val client = MongoClients.create(settings)
- return client.getDatabase(mongoDb)
- }
-
- /**
- * Run a single scenario.
- */
- private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>) {
- val id = scenario.getString("_id")
-
- logger.info { "Constructing performance interference model" }
-
- val traceDir = File(
- tracePath,
- scenario.getEmbedded(listOf("trace", "traceId"), String::class.java)
- )
- val traceReader = Sc20RawParquetTraceReader(traceDir)
- val performanceInterferenceReader = let {
- val path = File(traceDir, "performance-interference-model.json")
- val operational = scenario.get("operational", Document::class.java)
- val enabled = operational.getBoolean("performanceInterferenceEnabled")
-
- if (!enabled || !path.exists()) {
- return@let null
- }
-
- path.inputStream().use { Sc20PerformanceInterferenceReader(it) }
- }
-
- val targets = portfolio.get("targets", Document::class.java)
-
- repeat(targets.getInteger("repeatsPerScenario")) {
- logger.info { "Starting repeat $it" }
- runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader)
- }
-
- logger.info { "Finished simulation for scenario $id" }
- }
-
- /**
- * Run a single repeat.
- */
- private suspend fun runRepeat(
- scenario: Document,
- repeat: Int,
- topologies: MongoCollection<Document>,
- traceReader: Sc20RawParquetTraceReader,
- performanceInterferenceReader: Sc20PerformanceInterferenceReader?
- ) {
- val id = scenario.getString("_id")
- val seed = repeat
- val traceDocument = scenario.get("trace", Document::class.java)
- val workloadName = traceDocument.getString("traceId")
- val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
-
- val seeder = Random(seed)
- val system = provider("experiment-$id")
- val root = system.newDomain("root")
-
- val chan = Channel<Unit>(Channel.CONFLATED)
-
- val operational = scenario.get("operational", Document::class.java)
- val allocationPolicy =
- when (val policyName = operational.getString("schedulerName")) {
- "mem" -> AvailableMemoryAllocationPolicy()
- "mem-inv" -> AvailableMemoryAllocationPolicy(true)
- "core-mem" -> AvailableCoreMemoryAllocationPolicy()
- "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
- "active-servers" -> NumberOfActiveServersAllocationPolicy()
- "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
- "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
- "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
- "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
- else -> throw IllegalArgumentException("Unknown policy $policyName")
- }
-
- val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap()
- val trace = Sc20ParquetTraceReader(
- listOf(traceReader),
- performanceInterferenceModel,
- Workload(workloadName, workloadFraction),
- seed
- )
- val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java)
- val environment = TopologyParser(topologies, topologyId)
- val monitor = ParquetExperimentMonitor(
- outputPath,
- "scenario_id=$id/run_id=$repeat",
- 4096
- )
-
- root.launch {
- val (bareMetalProvisioner, scheduler) = createProvisioner(
- root,
- environment,
- allocationPolicy
- )
-
- val failureDomain = if (operational.getBoolean("failuresEnabled")) {
- logger.debug("ENABLING failures")
- createFailureDomain(
- seeder.nextInt(),
- operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7,
- bareMetalProvisioner,
- chan
- )
- } else {
- null
- }
-
- attachMonitor(scheduler, monitor)
- processTrace(
- trace,
- scheduler,
- chan,
- monitor,
- emptyMap()
- )
-
- logger.debug("SUBMIT=${scheduler.submittedVms}")
- logger.debug("FAIL=${scheduler.unscheduledVms}")
- logger.debug("QUEUED=${scheduler.queuedVms}")
- logger.debug("RUNNING=${scheduler.runningVms}")
- logger.debug("FINISHED=${scheduler.finishedVms}")
-
- failureDomain?.cancel()
- scheduler.terminate()
- }
-
- try {
- system.run()
- } finally {
- system.terminate()
- monitor.close()
- }
- }
-
- val POLL_INTERVAL = 5000L // ms = 5 s
- val HEARTBEAT_INTERVAL = 60000L // ms = 1 min
-
- override fun run() = runBlocking(Dispatchers.Default) {
- logger.info { "Starting OpenDC web runner" }
- logger.info { "Connecting to MongoDB instance" }
- val database = createDatabase()
- val manager = ScenarioManager(database.getCollection("scenarios"))
- val portfolios = database.getCollection("portfolios")
- val topologies = database.getCollection("topologies")
-
- logger.info { "Launching Spark" }
- val resultProcessor = ResultProcessor(spark, outputPath)
-
- logger.info { "Watching for queued scenarios" }
-
- while (true) {
- val scenario = manager.findNext()
-
- if (scenario == null) {
- delay(POLL_INTERVAL)
- continue
- }
-
- val id = scenario.getString("_id")
-
- logger.info { "Found queued scenario $id: attempting to claim" }
-
- if (!manager.claim(id)) {
- logger.info { "Failed to claim scenario" }
- continue
- }
-
- coroutineScope {
- // Launch heartbeat process
- val heartbeat = launch {
- while (true) {
- delay(HEARTBEAT_INTERVAL)
- manager.heartbeat(id)
- }
- }
-
- try {
- val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!!
- runScenario(portfolio, scenario, topologies)
-
- logger.info { "Starting result processing" }
-
- val result = resultProcessor.process(id)
- manager.finish(id, result)
-
- logger.info { "Successfully finished scenario $id" }
- } catch (e: Exception) {
- logger.warn(e) { "Scenario failed to finish" }
- manager.fail(id)
- } finally {
- heartbeat.cancel()
- }
- }
- }
- }
-}
-
-/**
- * Main entry point of the runner.
- */
-fun main(args: Array<String>) = RunnerCli().main(args)
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt
deleted file mode 100644
index 39092653..00000000
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt
+++ /dev/null
@@ -1,187 +0,0 @@
-package com.atlarge.opendc.runner.web
-
-import java.io.File
-import org.apache.spark.sql.Column
-import org.apache.spark.sql.Dataset
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.functions.*
-
-/**
- * A helper class for processing the experiment results using Apache Spark.
- */
-class ResultProcessor(private val master: String, private val outputPath: File) {
- /**
- * Process the results of the scenario with the given [id].
- */
- fun process(id: String): Result {
- val spark = SparkSession.builder()
- .master(master)
- .appName("opendc-simulator-$id")
- .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver
- .orCreate
-
- try {
- val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path)
- val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path)
- val res = aggregate(hostMetrics, provisionerMetrics).first()
-
- return Result(
- res.getList<Long>(1),
- res.getList<Long>(2),
- res.getList<Long>(3),
- res.getList<Long>(4),
- res.getList<Double>(5),
- res.getList<Double>(6),
- res.getList<Double>(7),
- res.getList<Int>(8),
- res.getList<Long>(9),
- res.getList<Long>(10),
- res.getList<Long>(11),
- res.getList<Int>(12),
- res.getList<Int>(13),
- res.getList<Int>(14),
- res.getList<Int>(15)
- )
- } finally {
- spark.close()
- }
- }
-
- data class Result(
- val totalRequestedBurst: List<Long>,
- val totalGrantedBurst: List<Long>,
- val totalOvercommittedBurst: List<Long>,
- val totalInterferedBurst: List<Long>,
- val meanCpuUsage: List<Double>,
- val meanCpuDemand: List<Double>,
- val meanNumDeployedImages: List<Double>,
- val maxNumDeployedImages: List<Int>,
- val totalPowerDraw: List<Long>,
- val totalFailureSlices: List<Long>,
- val totalFailureVmSlices: List<Long>,
- val totalVmsSubmitted: List<Int>,
- val totalVmsQueued: List<Int>,
- val totalVmsFinished: List<Int>,
- val totalVmsFailed: List<Int>
- )
-
- /**
- * Perform aggregation of the experiment results.
- */
- private fun aggregate(hostMetrics: Dataset<Row>, provisionerMetrics: Dataset<Row>): Dataset<Row> {
- // Extrapolate the duration of the entries to span the entire trace
- val hostMetricsExtra = hostMetrics
- .withColumn("slice_counts", floor(col("duration") / lit(sliceLength)))
- .withColumn("power_draw", col("power_draw") * col("slice_counts"))
- .withColumn("state_int", states[col("state")])
- .withColumn("state_opposite_int", oppositeStates[col("state")])
- .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int"))
- .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts"))
- .withColumn("failure_slice_count", col("slice_counts") * col("state_int"))
- .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count"))
-
- // Process all data in a single run
- val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id")
-
- // Aggregate the summed total metrics
- val systemMetrics = hostMetricsGrouped.agg(
- sum("requested_burst").alias("total_requested_burst"),
- sum("granted_burst").alias("total_granted_burst"),
- sum("overcommissioned_burst").alias("total_overcommitted_burst"),
- sum("interfered_burst").alias("total_interfered_burst"),
- sum("power_draw").alias("total_power_draw"),
- sum("failure_slice_count").alias("total_failure_slices"),
- sum("failure_vm_slice_count").alias("total_failure_vm_slices")
- )
-
- // Aggregate metrics per host
- val hvMetrics = hostMetrics
- .groupBy("run_id", "host_id")
- .agg(
- sum("cpu_usage").alias("mean_cpu_usage"),
- sum("cpu_demand").alias("mean_cpu_demand"),
- avg("vm_count").alias("mean_num_deployed_images"),
- count(lit(1)).alias("num_rows")
- )
- .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows"))
- .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows"))
- .groupBy("run_id")
- .agg(
- avg("mean_cpu_usage").alias("mean_cpu_usage"),
- avg("mean_cpu_demand").alias("mean_cpu_demand"),
- avg("mean_num_deployed_images").alias("mean_num_deployed_images"),
- max("mean_num_deployed_images").alias("max_num_deployed_images")
- )
-
- // Group the provisioner metrics per run
- val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id")
-
- // Aggregate the provisioner metrics
- val provisionerMetricsAggregated = provisionerMetricsGrouped.agg(
- max("vm_total_count").alias("total_vms_submitted"),
- max("vm_waiting_count").alias("total_vms_queued"),
- max("vm_active_count").alias("total_vms_running"),
- max("vm_inactive_count").alias("total_vms_finished"),
- max("vm_failed_count").alias("total_vms_failed")
- )
-
- // Join the results into a single data frame
- return systemMetrics
- .join(hvMetrics, "run_id")
- .join(provisionerMetricsAggregated, "run_id")
- .select(
- col("total_requested_burst"),
- col("total_granted_burst"),
- col("total_overcommitted_burst"),
- col("total_interfered_burst"),
- col("mean_cpu_usage"),
- col("mean_cpu_demand"),
- col("mean_num_deployed_images"),
- col("max_num_deployed_images"),
- col("total_power_draw"),
- col("total_failure_slices"),
- col("total_failure_vm_slices"),
- col("total_vms_submitted"),
- col("total_vms_queued"),
- col("total_vms_finished"),
- col("total_vms_failed")
- )
- .groupBy(lit(1))
- .agg(
- // TODO Check if order of values is correct
- collect_list(col("total_requested_burst")).alias("total_requested_burst"),
- collect_list(col("total_granted_burst")).alias("total_granted_burst"),
- collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"),
- collect_list(col("total_interfered_burst")).alias("total_interfered_burst"),
- collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"),
- collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"),
- collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"),
- collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"),
- collect_list(col("total_power_draw")).alias("total_power_draw"),
- collect_list(col("total_failure_slices")).alias("total_failure_slices"),
- collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"),
- collect_list(col("total_vms_submitted")).alias("total_vms_submitted"),
- collect_list(col("total_vms_queued")).alias("total_vms_queued"),
- collect_list(col("total_vms_finished")).alias("total_vms_finished"),
- collect_list(col("total_vms_failed")).alias("total_vms_failed")
- )
- }
-
- // Spark helper functions
- operator fun Column.times(other: Column): Column = `$times`(other)
- operator fun Column.div(other: Column): Column = `$div`(other)
- operator fun Column.get(other: Column): Column = this.apply(other)
-
- val sliceLength = 5 * 60 * 1000
- val states = map(
- lit("ERROR"), lit(1),
- lit("ACTIVE"), lit(0),
- lit("SHUTOFF"), lit(0)
- )
- val oppositeStates = map(
- lit("ERROR"), lit(0),
- lit("ACTIVE"), lit(1),
- lit("SHUTOFF"), lit(1)
- )
-}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
deleted file mode 100644
index 40ffd282..00000000
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
+++ /dev/null
@@ -1,93 +0,0 @@
-package com.atlarge.opendc.runner.web
-
-import com.mongodb.client.MongoCollection
-import com.mongodb.client.model.Filters
-import com.mongodb.client.model.Updates
-import java.time.Instant
-import org.bson.Document
-
-/**
- * Manages the queue of scenarios that need to be processed.
- */
-class ScenarioManager(private val collection: MongoCollection<Document>) {
- /**
- * Find the next scenario that the simulator needs to process.
- */
- fun findNext(): Document? {
- return collection
- .find(Filters.eq("simulation.state", "QUEUED"))
- .first()
- }
-
- /**
- * Claim the scenario in the database with the specified id.
- */
- fun claim(id: String): Boolean {
- val res = collection.findOneAndUpdate(
- Filters.and(
- Filters.eq("_id", id),
- Filters.eq("simulation.state", "QUEUED")
- ),
- Updates.combine(
- Updates.set("simulation.state", "RUNNING"),
- Updates.set("simulation.heartbeat", Instant.now())
- )
- )
- return res != null
- }
-
- /**
- * Update the heartbeat of the specified scenario.
- */
- fun heartbeat(id: String) {
- collection.findOneAndUpdate(
- Filters.and(
- Filters.eq("_id", id),
- Filters.eq("simulation.state", "RUNNING")
- ),
- Updates.set("simulation.heartbeat", Instant.now())
- )
- }
-
- /**
- * Mark the scenario as failed.
- */
- fun fail(id: String) {
- collection.findOneAndUpdate(
- Filters.eq("_id", id),
- Updates.combine(
- Updates.set("simulation.state", "FAILED"),
- Updates.set("simulation.heartbeat", Instant.now())
- )
- )
- }
-
- /**
- * Persist the specified results.
- */
- fun finish(id: String, result: ResultProcessor.Result) {
- collection.findOneAndUpdate(
- Filters.eq("_id", id),
- Updates.combine(
- Updates.set("simulation.state", "FINISHED"),
- Updates.unset("simulation.time"),
- Updates.set("results.total_requested_burst", result.totalRequestedBurst),
- Updates.set("results.total_granted_burst", result.totalGrantedBurst),
- Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst),
- Updates.set("results.total_interfered_burst", result.totalInterferedBurst),
- Updates.set("results.mean_cpu_usage", result.meanCpuUsage),
- Updates.set("results.mean_cpu_demand", result.meanCpuDemand),
- Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages),
- Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages),
- Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages),
- Updates.set("results.total_power_draw", result.totalPowerDraw),
- Updates.set("results.total_failure_slices", result.totalFailureSlices),
- Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices),
- Updates.set("results.total_vms_submitted", result.totalVmsSubmitted),
- Updates.set("results.total_vms_queued", result.totalVmsQueued),
- Updates.set("results.total_vms_finished", result.totalVmsFinished),
- Updates.set("results.total_vms_failed", result.totalVmsFailed)
- )
- )
- }
-}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
deleted file mode 100644
index 499585ec..00000000
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
+++ /dev/null
@@ -1,127 +0,0 @@
-package com.atlarge.opendc.runner.web
-
-import com.atlarge.odcsim.Domain
-import com.atlarge.opendc.compute.core.MemoryUnit
-import com.atlarge.opendc.compute.core.ProcessingNode
-import com.atlarge.opendc.compute.core.ProcessingUnit
-import com.atlarge.opendc.compute.metal.NODE_CLUSTER
-import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
-import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel
-import com.atlarge.opendc.compute.metal.service.ProvisioningService
-import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService
-import com.atlarge.opendc.core.Environment
-import com.atlarge.opendc.core.Platform
-import com.atlarge.opendc.core.Zone
-import com.atlarge.opendc.core.services.ServiceRegistry
-import com.atlarge.opendc.format.environment.EnvironmentReader
-import com.mongodb.client.AggregateIterable
-import com.mongodb.client.MongoCollection
-import com.mongodb.client.model.Aggregates
-import com.mongodb.client.model.Field
-import com.mongodb.client.model.Filters
-import com.mongodb.client.model.Projections
-import java.util.*
-import kotlinx.coroutines.launch
-import org.bson.Document
-
-/**
- * A helper class that converts the MongoDB topology into an OpenDC environment.
- */
-class TopologyParser(private val collection: MongoCollection<Document>, private val id: String) : EnvironmentReader {
- /**
- * Parse the topology with the specified [id].
- */
- override suspend fun construct(dom: Domain): Environment {
- val nodes = mutableListOf<SimpleBareMetalDriver>()
- val random = Random(0)
-
- for (machine in fetchMachines(id)) {
- val machineId = machine.getString("_id")
- val clusterId = machine.getString("rack_id")
- val position = machine.getInteger("position")
-
- val processors = machine.getList("cpus", Document::class.java).flatMap { cpu ->
- val cores = cpu.getInteger("numberOfCores")
- val speed = cpu.get("clockRateMhz", Number::class.java).toDouble()
- // TODO Remove hardcoding of vendor
- val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores)
- List(cores) { coreId ->
- ProcessingUnit(node, coreId, speed)
- }
- }
- val memoryUnits = machine.getList("memories", Document::class.java).map { memory ->
- MemoryUnit(
- "Samsung",
- memory.getString("name"),
- memory.get("speedMbPerS", Number::class.java).toDouble(),
- memory.get("sizeMb", Number::class.java).toLong()
- )
- }
- nodes.add(
- SimpleBareMetalDriver(
- dom.newDomain(machineId),
- UUID(random.nextLong(), random.nextLong()),
- "node-$clusterId-$position",
- mapOf(NODE_CLUSTER to clusterId),
- processors,
- memoryUnits,
- // For now we assume a simple linear load model with an idle draw of ~200W and a maximum
- // power draw of 350W.
- // Source: https://stackoverflow.com/questions/6128960
- LinearLoadPowerModel(200.0, 350.0)
- )
- )
- }
-
- val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner"))
- dom.launch {
- for (node in nodes) {
- provisioningService.create(node)
- }
- }
-
- val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
-
- val platform = Platform(
- UUID.randomUUID(), "opendc-platform", listOf(
- Zone(UUID.randomUUID(), "zone", serviceRegistry)
- )
- )
-
- return Environment(fetchName(id), null, listOf(platform))
- }
-
- override fun close() {}
-
- /**
- * Fetch the metadata of the topology.
- */
- private fun fetchName(id: String): String {
- return collection.aggregate(
- listOf(
- Aggregates.match(Filters.eq("_id", id)),
- Aggregates.project(Projections.include("name"))
- )
- )
- .first()!!
- .getString("name")
- }
-
- /**
- * Fetch a topology from the database with the specified [id].
- */
- private fun fetchMachines(id: String): AggregateIterable<Document> {
- return collection.aggregate(
- listOf(
- Aggregates.match(Filters.eq("_id", id)),
- Aggregates.project(Projections.fields(Document("racks", "\$rooms.tiles.rack"))),
- Aggregates.unwind("\$racks"),
- Aggregates.unwind("\$racks"),
- Aggregates.replaceRoot("\$racks"),
- Aggregates.addFields(Field("machines.rack_id", "\$_id")),
- Aggregates.unwind("\$machines"),
- Aggregates.replaceRoot("\$machines")
- )
- )
- }
-}
diff --git a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml
deleted file mode 100644
index 1d873554..00000000
--- a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ MIT License
- ~
- ~ Copyright (c) 2020 atlarge-research
- ~
- ~ Permission is hereby granted, free of charge, to any person obtaining a copy
- ~ of this software and associated documentation files (the "Software"), to deal
- ~ in the Software without restriction, including without limitation the rights
- ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- ~ copies of the Software, and to permit persons to whom the Software is
- ~ furnished to do so, subject to the following conditions:
- ~
- ~ The above copyright notice and this permission notice shall be included in all
- ~ copies or substantial portions of the Software.
- ~
- ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- ~ SOFTWARE.
- -->
-
-<Configuration status="WARN">
- <Appenders>
- <Console name="Console" target="SYSTEM_OUT">
- <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false" />
- </Console>
- </Appenders>
- <Loggers>
- <Logger name="com.atlarge.odcsim" level="info" additivity="false">
- <AppenderRef ref="Console"/>
- </Logger>
- <Logger name="com.atlarge.opendc" level="warn" additivity="false">
- <AppenderRef ref="Console"/>
- </Logger>
- <Logger name="com.atlarge.opendc.runner" level="info" additivity="false">
- <AppenderRef ref="Console"/>
- </Logger>
- <Logger name="org.apache.hadoop" level="warn" additivity="false">
- <AppenderRef ref="Console"/>
- </Logger>
- <Logger name="org.apache.spark" level="info" additivity="false">
- <AppenderRef ref="Console"/>
- </Logger>
- <Root level="error">
- <AppenderRef ref="Console"/>
- </Root>
- </Loggers>
-</Configuration>