From c41d201343263346ac84855a0b2254051ed33c21 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 30 Sep 2020 21:14:20 +0200 Subject: Eliminate use of Domain and simulationContext in OpenDC This change takes the first step in eliminating the explict use of Domain and simulationContext from OpenDC. In this way, we decouple the logic of various datacenter services from simulation logic, which should promote re-use. --- .../kotlin/com/atlarge/opendc/runner/web/Main.kt | 6 +++--- .../com/atlarge/opendc/runner/web/ResultProcessor.kt | 20 +++++++++++++------- .../com/atlarge/opendc/runner/web/ScenarioManager.kt | 2 +- .../com/atlarge/opendc/runner/web/TopologyParser.kt | 19 ++++++++++++------- 4 files changed, 29 insertions(+), 18 deletions(-) (limited to 'simulator/opendc/opendc-runner-web') diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt index 807c119e..9cfe5531 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -22,13 +22,13 @@ import com.mongodb.client.MongoClients import com.mongodb.client.MongoCollection import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Filters -import java.io.File -import java.util.* -import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import mu.KotlinLogging import org.bson.Document +import java.io.File +import java.util.* +import kotlin.random.Random private val logger = KotlinLogging.logger {} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt index 39092653..c0b0ac31 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt @@ -1,11 +1,11 @@ package com.atlarge.opendc.runner.web -import java.io.File import org.apache.spark.sql.Column import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.* +import java.io.File /** * A helper class for processing the experiment results using Apache Spark. @@ -175,13 +175,19 @@ class ResultProcessor(private val master: String, private val outputPath: File) val sliceLength = 5 * 60 * 1000 val states = map( - lit("ERROR"), lit(1), - lit("ACTIVE"), lit(0), - lit("SHUTOFF"), lit(0) + lit("ERROR"), + lit(1), + lit("ACTIVE"), + lit(0), + lit("SHUTOFF"), + lit(0) ) val oppositeStates = map( - lit("ERROR"), lit(0), - lit("ACTIVE"), lit(1), - lit("SHUTOFF"), lit(1) + lit("ERROR"), + lit(0), + lit("ACTIVE"), + lit(1), + lit("SHUTOFF"), + lit(1) ) } diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt index 40ffd282..6ec4995d 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt @@ -3,8 +3,8 @@ package com.atlarge.opendc.runner.web import com.mongodb.client.MongoCollection import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates -import java.time.Instant import org.bson.Document +import java.time.Instant /** * Manages the queue of scenarios that need to be processed. diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt index 499585ec..ab683985 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt @@ -1,6 +1,6 @@ package com.atlarge.opendc.runner.web -import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -20,9 +20,10 @@ import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Field import com.mongodb.client.model.Filters import com.mongodb.client.model.Projections -import java.util.* +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import org.bson.Document +import java.util.* /** * A helper class that converts the MongoDB topology into an OpenDC environment. @@ -31,7 +32,8 @@ class TopologyParser(private val collection: MongoCollection, private /** * Parse the topology with the specified [id]. */ - override suspend fun construct(dom: Domain): Environment { + override suspend fun construct(coroutineScope: CoroutineScope): Environment { + val clock = simulationContext.clock val nodes = mutableListOf() val random = Random(0) @@ -59,7 +61,8 @@ class TopologyParser(private val collection: MongoCollection, private } nodes.add( SimpleBareMetalDriver( - dom.newDomain(machineId), + coroutineScope, + clock, UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$position", mapOf(NODE_CLUSTER to clusterId), @@ -73,8 +76,8 @@ class TopologyParser(private val collection: MongoCollection, private ) } - val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner")) - dom.launch { + val provisioningService = SimpleProvisioningService() + coroutineScope.launch { for (node in nodes) { provisioningService.create(node) } @@ -83,7 +86,9 @@ class TopologyParser(private val collection: MongoCollection, private val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( - UUID.randomUUID(), "opendc-platform", listOf( + UUID.randomUUID(), + "opendc-platform", + listOf( Zone(UUID.randomUUID(), "zone", serviceRegistry) ) ) -- cgit v1.2.3 From fcae560208df4860bc7461f955bf3b522b0e61c5 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 30 Sep 2020 23:56:07 +0200 Subject: Migrate from Domain to TestCoroutineScope This change eliminates the use of Domain and simulationContext in favour of the generic (Test)CoroutineScope and Clock classes. In this way, we decouple the OpenDC modules and their logic from simulation-related code. In this way, we also simplify eventual attempt for emulating OpenDC componments in real-time. --- .../opendc/opendc-runner-web/build.gradle.kts | 1 + .../kotlin/com/atlarge/opendc/runner/web/Main.kt | 27 +++++++++++----------- .../atlarge/opendc/runner/web/TopologyParser.kt | 5 ++-- 3 files changed, 17 insertions(+), 16 deletions(-) (limited to 'simulator/opendc/opendc-runner-web') diff --git a/simulator/opendc/opendc-runner-web/build.gradle.kts b/simulator/opendc/opendc-runner-web/build.gradle.kts index 6f725de1..1d263e75 100644 --- a/simulator/opendc/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc/opendc-runner-web/build.gradle.kts @@ -39,6 +39,7 @@ dependencies { implementation(project(":opendc:opendc-compute")) implementation(project(":opendc:opendc-format")) implementation(project(":opendc:opendc-experiments-sc20")) + implementation(project(":opendc:opendc-simulator")) implementation("com.github.ajalt:clikt:2.8.0") implementation("io.github.microutils:kotlin-logging:1.7.10") diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt index 9cfe5531..ac4d9087 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -1,6 +1,5 @@ package com.atlarge.opendc.runner.web -import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.opendc.compute.virt.service.allocation.* import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain @@ -24,22 +23,20 @@ import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Filters import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging import org.bson.Document +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.io.File import java.util.* import kotlin.random.Random private val logger = KotlinLogging.logger {} -/** - * The provider for the simulation engine to use. - */ -private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - /** * Represents the CLI command for starting the OpenDC web runner. */ +@OptIn(ExperimentalCoroutinesApi::class) class RunnerCli : CliktCommand(name = "runner") { /** * The name of the database to use. @@ -195,8 +192,8 @@ class RunnerCli : CliktCommand(name = "runner") { val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() val seeder = Random(seed) - val system = provider("experiment-$id") - val root = system.newDomain("root") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) val chan = Channel(Channel.CONFLATED) @@ -230,9 +227,10 @@ class RunnerCli : CliktCommand(name = "runner") { 4096 ) - root.launch { + testScope.launch { val (bareMetalProvisioner, scheduler) = createProvisioner( - root, + this, + clock, environment, allocationPolicy ) @@ -240,6 +238,8 @@ class RunnerCli : CliktCommand(name = "runner") { val failureDomain = if (operational.getBoolean("failuresEnabled")) { logger.debug("ENABLING failures") createFailureDomain( + testScope, + clock, seeder.nextInt(), operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, bareMetalProvisioner, @@ -249,8 +249,10 @@ class RunnerCli : CliktCommand(name = "runner") { null } - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, trace, scheduler, chan, @@ -269,9 +271,8 @@ class RunnerCli : CliktCommand(name = "runner") { } try { - system.run() + testScope.advanceUntilIdle() } finally { - system.terminate() monitor.close() } } diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt index ab683985..f9b1c6c4 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt @@ -1,6 +1,5 @@ package com.atlarge.opendc.runner.web -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -23,6 +22,7 @@ import com.mongodb.client.model.Projections import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import org.bson.Document +import java.time.Clock import java.util.* /** @@ -32,8 +32,7 @@ class TopologyParser(private val collection: MongoCollection, private /** * Parse the topology with the specified [id]. */ - override suspend fun construct(coroutineScope: CoroutineScope): Environment { - val clock = simulationContext.clock + override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { val nodes = mutableListOf() val random = Random(0) -- cgit v1.2.3 From 27ddd462d148d70760e45f967387905054e21d20 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 1 Oct 2020 00:32:46 +0200 Subject: Remove odcsim components from repository This change removes the odcsim components from the repository as we have eliminated their use in the OpenDC codebase, by replacing them with the more generic (Test)CoroutineScope and Clock. From now on, we will only place modules under the OpenDC namespace and not use odcsim as well to prevent confusion. --- simulator/opendc/opendc-runner-web/build.gradle.kts | 1 - 1 file changed, 1 deletion(-) (limited to 'simulator/opendc/opendc-runner-web') diff --git a/simulator/opendc/opendc-runner-web/build.gradle.kts b/simulator/opendc/opendc-runner-web/build.gradle.kts index 1d263e75..479eaca7 100644 --- a/simulator/opendc/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc/opendc-runner-web/build.gradle.kts @@ -50,7 +50,6 @@ dependencies { exclude(group = "log4j") } - runtimeOnly(project(":odcsim:odcsim-engine-omega")) runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:2.13.1") } -- cgit v1.2.3 From 8a9f5573bef3f68316add17c04a47cc4e5fe75fa Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 1 Oct 2020 00:49:53 +0200 Subject: Move OpenDC modules into simulator root This change moves the OpenDC modules previously living in the simulator/opendc directory to the simulator directory itself given that we do not make a distinction between OpenDC and odcsim anymore. --- .../opendc/opendc-runner-web/build.gradle.kts | 55 ---- .../kotlin/com/atlarge/opendc/runner/web/Main.kt | 346 --------------------- .../atlarge/opendc/runner/web/ResultProcessor.kt | 193 ------------ .../atlarge/opendc/runner/web/ScenarioManager.kt | 93 ------ .../atlarge/opendc/runner/web/TopologyParser.kt | 131 -------- .../src/main/resources/log4j2.xml | 52 ---- 6 files changed, 870 deletions(-) delete mode 100644 simulator/opendc/opendc-runner-web/build.gradle.kts delete mode 100644 simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt delete mode 100644 simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt delete mode 100644 simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt delete mode 100644 simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt delete mode 100644 simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml (limited to 'simulator/opendc/opendc-runner-web') diff --git a/simulator/opendc/opendc-runner-web/build.gradle.kts b/simulator/opendc/opendc-runner-web/build.gradle.kts deleted file mode 100644 index 479eaca7..00000000 --- a/simulator/opendc/opendc-runner-web/build.gradle.kts +++ /dev/null @@ -1,55 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Experiment runner for OpenDC" - -/* Build configuration */ -plugins { - `kotlin-library-convention` - application -} - -application { - mainClassName = "com.atlarge.opendc.runner.web.MainKt" -} - -dependencies { - api(project(":opendc:opendc-core")) - implementation(project(":opendc:opendc-compute")) - implementation(project(":opendc:opendc-format")) - implementation(project(":opendc:opendc-experiments-sc20")) - implementation(project(":opendc:opendc-simulator")) - - implementation("com.github.ajalt:clikt:2.8.0") - implementation("io.github.microutils:kotlin-logging:1.7.10") - - implementation("org.mongodb:mongodb-driver-sync:4.0.5") - implementation("org.apache.spark:spark-sql_2.12:3.0.0") { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - exclude(group = "log4j") - } - - runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") - runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:2.13.1") -} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt deleted file mode 100644 index ac4d9087..00000000 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ /dev/null @@ -1,346 +0,0 @@ -package com.atlarge.opendc.runner.web - -import com.atlarge.opendc.compute.virt.service.allocation.* -import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor -import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain -import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner -import com.atlarge.opendc.experiments.sc20.experiment.model.Workload -import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor -import com.atlarge.opendc.experiments.sc20.experiment.processTrace -import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader -import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader -import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.parameters.options.* -import com.github.ajalt.clikt.parameters.types.file -import com.github.ajalt.clikt.parameters.types.int -import com.mongodb.MongoClientSettings -import com.mongodb.MongoCredential -import com.mongodb.ServerAddress -import com.mongodb.client.MongoClients -import com.mongodb.client.MongoCollection -import com.mongodb.client.MongoDatabase -import com.mongodb.client.model.Filters -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.test.TestCoroutineScope -import mu.KotlinLogging -import org.bson.Document -import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.io.File -import java.util.* -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * Represents the CLI command for starting the OpenDC web runner. - */ -@OptIn(ExperimentalCoroutinesApi::class) -class RunnerCli : CliktCommand(name = "runner") { - /** - * The name of the database to use. - */ - private val mongoDb by option( - "--mongo-db", - help = "name of the database to use", - envvar = "OPENDC_DB" - ) - .default("opendc") - - /** - * The database host to connect to. - */ - private val mongoHost by option( - "--mongo-host", - help = "database host to connect to", - envvar = "OPENDC_DB_HOST" - ) - .default("localhost") - - /** - * The database port to connect to. - */ - private val mongoPort by option( - "--mongo-port", - help = "database port to connect to", - envvar = "OPENDC_DB_PORT" - ) - .int() - .default(27017) - - /** - * The database user to connect with. - */ - private val mongoUser by option( - "--mongo-user", - help = "database user to connect with", - envvar = "OPENDC_DB_USER" - ) - .default("opendc") - - /** - * The database password to connect with. - */ - private val mongoPassword by option( - "--mongo-password", - help = "database password to connect with", - envvar = "OPENDC_DB_PASSWORD" - ) - .convert { it.toCharArray() } - .required() - - /** - * The path to the traces directory. - */ - private val tracePath by option( - "--traces", - help = "path to the directory containing the traces", - envvar = "OPENDC_TRACES" - ) - .file(canBeFile = false) - .defaultLazy { File("traces/") } - - /** - * The path to the output directory. - */ - private val outputPath by option( - "--output", - help = "path to the results directory", - envvar = "OPENDC_OUTPUT" - ) - .file(canBeFile = false) - .defaultLazy { File("results/") } - - /** - * The Spark master to connect to. - */ - private val spark by option( - "--spark", - help = "Spark master to connect to", - envvar = "OPENDC_SPARK" - ) - .default("local[*]") - - /** - * Connect to the user-specified database. - */ - private fun createDatabase(): MongoDatabase { - val credential = MongoCredential.createScramSha1Credential( - mongoUser, - mongoDb, - mongoPassword - ) - - val settings = MongoClientSettings.builder() - .credential(credential) - .applyToClusterSettings { it.hosts(listOf(ServerAddress(mongoHost, mongoPort))) } - .build() - val client = MongoClients.create(settings) - return client.getDatabase(mongoDb) - } - - /** - * Run a single scenario. - */ - private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection) { - val id = scenario.getString("_id") - - logger.info { "Constructing performance interference model" } - - val traceDir = File( - tracePath, - scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) - ) - val traceReader = Sc20RawParquetTraceReader(traceDir) - val performanceInterferenceReader = let { - val path = File(traceDir, "performance-interference-model.json") - val operational = scenario.get("operational", Document::class.java) - val enabled = operational.getBoolean("performanceInterferenceEnabled") - - if (!enabled || !path.exists()) { - return@let null - } - - path.inputStream().use { Sc20PerformanceInterferenceReader(it) } - } - - val targets = portfolio.get("targets", Document::class.java) - - repeat(targets.getInteger("repeatsPerScenario")) { - logger.info { "Starting repeat $it" } - runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader) - } - - logger.info { "Finished simulation for scenario $id" } - } - - /** - * Run a single repeat. - */ - private suspend fun runRepeat( - scenario: Document, - repeat: Int, - topologies: MongoCollection, - traceReader: Sc20RawParquetTraceReader, - performanceInterferenceReader: Sc20PerformanceInterferenceReader? - ) { - val id = scenario.getString("_id") - val seed = repeat - val traceDocument = scenario.get("trace", Document::class.java) - val workloadName = traceDocument.getString("traceId") - val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() - - val seeder = Random(seed) - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - - val chan = Channel(Channel.CONFLATED) - - val operational = scenario.get("operational", Document::class.java) - val allocationPolicy = - when (val policyName = operational.getString("schedulerName")) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - else -> throw IllegalArgumentException("Unknown policy $policyName") - } - - val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() - val trace = Sc20ParquetTraceReader( - listOf(traceReader), - performanceInterferenceModel, - Workload(workloadName, workloadFraction), - seed - ) - val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java) - val environment = TopologyParser(topologies, topologyId) - val monitor = ParquetExperimentMonitor( - outputPath, - "scenario_id=$id/run_id=$repeat", - 4096 - ) - - testScope.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner( - this, - clock, - environment, - allocationPolicy - ) - - val failureDomain = if (operational.getBoolean("failuresEnabled")) { - logger.debug("ENABLING failures") - createFailureDomain( - testScope, - clock, - seeder.nextInt(), - operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, - bareMetalProvisioner, - chan - ) - } else { - null - } - - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - trace, - scheduler, - chan, - monitor, - emptyMap() - ) - - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - } - - try { - testScope.advanceUntilIdle() - } finally { - monitor.close() - } - } - - val POLL_INTERVAL = 5000L // ms = 5 s - val HEARTBEAT_INTERVAL = 60000L // ms = 1 min - - override fun run() = runBlocking(Dispatchers.Default) { - logger.info { "Starting OpenDC web runner" } - logger.info { "Connecting to MongoDB instance" } - val database = createDatabase() - val manager = ScenarioManager(database.getCollection("scenarios")) - val portfolios = database.getCollection("portfolios") - val topologies = database.getCollection("topologies") - - logger.info { "Launching Spark" } - val resultProcessor = ResultProcessor(spark, outputPath) - - logger.info { "Watching for queued scenarios" } - - while (true) { - val scenario = manager.findNext() - - if (scenario == null) { - delay(POLL_INTERVAL) - continue - } - - val id = scenario.getString("_id") - - logger.info { "Found queued scenario $id: attempting to claim" } - - if (!manager.claim(id)) { - logger.info { "Failed to claim scenario" } - continue - } - - coroutineScope { - // Launch heartbeat process - val heartbeat = launch { - while (true) { - delay(HEARTBEAT_INTERVAL) - manager.heartbeat(id) - } - } - - try { - val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!! - runScenario(portfolio, scenario, topologies) - - logger.info { "Starting result processing" } - - val result = resultProcessor.process(id) - manager.finish(id, result) - - logger.info { "Successfully finished scenario $id" } - } catch (e: Exception) { - logger.warn(e) { "Scenario failed to finish" } - manager.fail(id) - } finally { - heartbeat.cancel() - } - } - } - } -} - -/** - * Main entry point of the runner. - */ -fun main(args: Array) = RunnerCli().main(args) diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt deleted file mode 100644 index c0b0ac31..00000000 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt +++ /dev/null @@ -1,193 +0,0 @@ -package com.atlarge.opendc.runner.web - -import org.apache.spark.sql.Column -import org.apache.spark.sql.Dataset -import org.apache.spark.sql.Row -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.* -import java.io.File - -/** - * A helper class for processing the experiment results using Apache Spark. - */ -class ResultProcessor(private val master: String, private val outputPath: File) { - /** - * Process the results of the scenario with the given [id]. - */ - fun process(id: String): Result { - val spark = SparkSession.builder() - .master(master) - .appName("opendc-simulator-$id") - .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver - .orCreate - - try { - val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path) - val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path) - val res = aggregate(hostMetrics, provisionerMetrics).first() - - return Result( - res.getList(1), - res.getList(2), - res.getList(3), - res.getList(4), - res.getList(5), - res.getList(6), - res.getList(7), - res.getList(8), - res.getList(9), - res.getList(10), - res.getList(11), - res.getList(12), - res.getList(13), - res.getList(14), - res.getList(15) - ) - } finally { - spark.close() - } - } - - data class Result( - val totalRequestedBurst: List, - val totalGrantedBurst: List, - val totalOvercommittedBurst: List, - val totalInterferedBurst: List, - val meanCpuUsage: List, - val meanCpuDemand: List, - val meanNumDeployedImages: List, - val maxNumDeployedImages: List, - val totalPowerDraw: List, - val totalFailureSlices: List, - val totalFailureVmSlices: List, - val totalVmsSubmitted: List, - val totalVmsQueued: List, - val totalVmsFinished: List, - val totalVmsFailed: List - ) - - /** - * Perform aggregation of the experiment results. - */ - private fun aggregate(hostMetrics: Dataset, provisionerMetrics: Dataset): Dataset { - // Extrapolate the duration of the entries to span the entire trace - val hostMetricsExtra = hostMetrics - .withColumn("slice_counts", floor(col("duration") / lit(sliceLength))) - .withColumn("power_draw", col("power_draw") * col("slice_counts")) - .withColumn("state_int", states[col("state")]) - .withColumn("state_opposite_int", oppositeStates[col("state")]) - .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int")) - .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts")) - .withColumn("failure_slice_count", col("slice_counts") * col("state_int")) - .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count")) - - // Process all data in a single run - val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id") - - // Aggregate the summed total metrics - val systemMetrics = hostMetricsGrouped.agg( - sum("requested_burst").alias("total_requested_burst"), - sum("granted_burst").alias("total_granted_burst"), - sum("overcommissioned_burst").alias("total_overcommitted_burst"), - sum("interfered_burst").alias("total_interfered_burst"), - sum("power_draw").alias("total_power_draw"), - sum("failure_slice_count").alias("total_failure_slices"), - sum("failure_vm_slice_count").alias("total_failure_vm_slices") - ) - - // Aggregate metrics per host - val hvMetrics = hostMetrics - .groupBy("run_id", "host_id") - .agg( - sum("cpu_usage").alias("mean_cpu_usage"), - sum("cpu_demand").alias("mean_cpu_demand"), - avg("vm_count").alias("mean_num_deployed_images"), - count(lit(1)).alias("num_rows") - ) - .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows")) - .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows")) - .groupBy("run_id") - .agg( - avg("mean_cpu_usage").alias("mean_cpu_usage"), - avg("mean_cpu_demand").alias("mean_cpu_demand"), - avg("mean_num_deployed_images").alias("mean_num_deployed_images"), - max("mean_num_deployed_images").alias("max_num_deployed_images") - ) - - // Group the provisioner metrics per run - val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id") - - // Aggregate the provisioner metrics - val provisionerMetricsAggregated = provisionerMetricsGrouped.agg( - max("vm_total_count").alias("total_vms_submitted"), - max("vm_waiting_count").alias("total_vms_queued"), - max("vm_active_count").alias("total_vms_running"), - max("vm_inactive_count").alias("total_vms_finished"), - max("vm_failed_count").alias("total_vms_failed") - ) - - // Join the results into a single data frame - return systemMetrics - .join(hvMetrics, "run_id") - .join(provisionerMetricsAggregated, "run_id") - .select( - col("total_requested_burst"), - col("total_granted_burst"), - col("total_overcommitted_burst"), - col("total_interfered_burst"), - col("mean_cpu_usage"), - col("mean_cpu_demand"), - col("mean_num_deployed_images"), - col("max_num_deployed_images"), - col("total_power_draw"), - col("total_failure_slices"), - col("total_failure_vm_slices"), - col("total_vms_submitted"), - col("total_vms_queued"), - col("total_vms_finished"), - col("total_vms_failed") - ) - .groupBy(lit(1)) - .agg( - // TODO Check if order of values is correct - collect_list(col("total_requested_burst")).alias("total_requested_burst"), - collect_list(col("total_granted_burst")).alias("total_granted_burst"), - collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"), - collect_list(col("total_interfered_burst")).alias("total_interfered_burst"), - collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"), - collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"), - collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"), - collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"), - collect_list(col("total_power_draw")).alias("total_power_draw"), - collect_list(col("total_failure_slices")).alias("total_failure_slices"), - collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"), - collect_list(col("total_vms_submitted")).alias("total_vms_submitted"), - collect_list(col("total_vms_queued")).alias("total_vms_queued"), - collect_list(col("total_vms_finished")).alias("total_vms_finished"), - collect_list(col("total_vms_failed")).alias("total_vms_failed") - ) - } - - // Spark helper functions - operator fun Column.times(other: Column): Column = `$times`(other) - operator fun Column.div(other: Column): Column = `$div`(other) - operator fun Column.get(other: Column): Column = this.apply(other) - - val sliceLength = 5 * 60 * 1000 - val states = map( - lit("ERROR"), - lit(1), - lit("ACTIVE"), - lit(0), - lit("SHUTOFF"), - lit(0) - ) - val oppositeStates = map( - lit("ERROR"), - lit(0), - lit("ACTIVE"), - lit(1), - lit("SHUTOFF"), - lit(1) - ) -} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt deleted file mode 100644 index 6ec4995d..00000000 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt +++ /dev/null @@ -1,93 +0,0 @@ -package com.atlarge.opendc.runner.web - -import com.mongodb.client.MongoCollection -import com.mongodb.client.model.Filters -import com.mongodb.client.model.Updates -import org.bson.Document -import java.time.Instant - -/** - * Manages the queue of scenarios that need to be processed. - */ -class ScenarioManager(private val collection: MongoCollection) { - /** - * Find the next scenario that the simulator needs to process. - */ - fun findNext(): Document? { - return collection - .find(Filters.eq("simulation.state", "QUEUED")) - .first() - } - - /** - * Claim the scenario in the database with the specified id. - */ - fun claim(id: String): Boolean { - val res = collection.findOneAndUpdate( - Filters.and( - Filters.eq("_id", id), - Filters.eq("simulation.state", "QUEUED") - ), - Updates.combine( - Updates.set("simulation.state", "RUNNING"), - Updates.set("simulation.heartbeat", Instant.now()) - ) - ) - return res != null - } - - /** - * Update the heartbeat of the specified scenario. - */ - fun heartbeat(id: String) { - collection.findOneAndUpdate( - Filters.and( - Filters.eq("_id", id), - Filters.eq("simulation.state", "RUNNING") - ), - Updates.set("simulation.heartbeat", Instant.now()) - ) - } - - /** - * Mark the scenario as failed. - */ - fun fail(id: String) { - collection.findOneAndUpdate( - Filters.eq("_id", id), - Updates.combine( - Updates.set("simulation.state", "FAILED"), - Updates.set("simulation.heartbeat", Instant.now()) - ) - ) - } - - /** - * Persist the specified results. - */ - fun finish(id: String, result: ResultProcessor.Result) { - collection.findOneAndUpdate( - Filters.eq("_id", id), - Updates.combine( - Updates.set("simulation.state", "FINISHED"), - Updates.unset("simulation.time"), - Updates.set("results.total_requested_burst", result.totalRequestedBurst), - Updates.set("results.total_granted_burst", result.totalGrantedBurst), - Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst), - Updates.set("results.total_interfered_burst", result.totalInterferedBurst), - Updates.set("results.mean_cpu_usage", result.meanCpuUsage), - Updates.set("results.mean_cpu_demand", result.meanCpuDemand), - Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages), - Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), - Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), - Updates.set("results.total_power_draw", result.totalPowerDraw), - Updates.set("results.total_failure_slices", result.totalFailureSlices), - Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices), - Updates.set("results.total_vms_submitted", result.totalVmsSubmitted), - Updates.set("results.total_vms_queued", result.totalVmsQueued), - Updates.set("results.total_vms_finished", result.totalVmsFinished), - Updates.set("results.total_vms_failed", result.totalVmsFailed) - ) - ) - } -} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt deleted file mode 100644 index f9b1c6c4..00000000 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt +++ /dev/null @@ -1,131 +0,0 @@ -package com.atlarge.opendc.runner.web - -import com.atlarge.opendc.compute.core.MemoryUnit -import com.atlarge.opendc.compute.core.ProcessingNode -import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.metal.NODE_CLUSTER -import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver -import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel -import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService -import com.atlarge.opendc.core.Environment -import com.atlarge.opendc.core.Platform -import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.services.ServiceRegistry -import com.atlarge.opendc.format.environment.EnvironmentReader -import com.mongodb.client.AggregateIterable -import com.mongodb.client.MongoCollection -import com.mongodb.client.model.Aggregates -import com.mongodb.client.model.Field -import com.mongodb.client.model.Filters -import com.mongodb.client.model.Projections -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.launch -import org.bson.Document -import java.time.Clock -import java.util.* - -/** - * A helper class that converts the MongoDB topology into an OpenDC environment. - */ -class TopologyParser(private val collection: MongoCollection, private val id: String) : EnvironmentReader { - /** - * Parse the topology with the specified [id]. - */ - override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { - val nodes = mutableListOf() - val random = Random(0) - - for (machine in fetchMachines(id)) { - val machineId = machine.getString("_id") - val clusterId = machine.getString("rack_id") - val position = machine.getInteger("position") - - val processors = machine.getList("cpus", Document::class.java).flatMap { cpu -> - val cores = cpu.getInteger("numberOfCores") - val speed = cpu.get("clockRateMhz", Number::class.java).toDouble() - // TODO Remove hardcoding of vendor - val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores) - List(cores) { coreId -> - ProcessingUnit(node, coreId, speed) - } - } - val memoryUnits = machine.getList("memories", Document::class.java).map { memory -> - MemoryUnit( - "Samsung", - memory.getString("name"), - memory.get("speedMbPerS", Number::class.java).toDouble(), - memory.get("sizeMb", Number::class.java).toLong() - ) - } - nodes.add( - SimpleBareMetalDriver( - coroutineScope, - clock, - UUID(random.nextLong(), random.nextLong()), - "node-$clusterId-$position", - mapOf(NODE_CLUSTER to clusterId), - processors, - memoryUnits, - // For now we assume a simple linear load model with an idle draw of ~200W and a maximum - // power draw of 350W. - // Source: https://stackoverflow.com/questions/6128960 - LinearLoadPowerModel(200.0, 350.0) - ) - ) - } - - val provisioningService = SimpleProvisioningService() - coroutineScope.launch { - for (node in nodes) { - provisioningService.create(node) - } - } - - val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) - - val platform = Platform( - UUID.randomUUID(), - "opendc-platform", - listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - ) - ) - - return Environment(fetchName(id), null, listOf(platform)) - } - - override fun close() {} - - /** - * Fetch the metadata of the topology. - */ - private fun fetchName(id: String): String { - return collection.aggregate( - listOf( - Aggregates.match(Filters.eq("_id", id)), - Aggregates.project(Projections.include("name")) - ) - ) - .first()!! - .getString("name") - } - - /** - * Fetch a topology from the database with the specified [id]. - */ - private fun fetchMachines(id: String): AggregateIterable { - return collection.aggregate( - listOf( - Aggregates.match(Filters.eq("_id", id)), - Aggregates.project(Projections.fields(Document("racks", "\$rooms.tiles.rack"))), - Aggregates.unwind("\$racks"), - Aggregates.unwind("\$racks"), - Aggregates.replaceRoot("\$racks"), - Aggregates.addFields(Field("machines.rack_id", "\$_id")), - Aggregates.unwind("\$machines"), - Aggregates.replaceRoot("\$machines") - ) - ) - } -} diff --git a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml deleted file mode 100644 index 1d873554..00000000 --- a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml +++ /dev/null @@ -1,52 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- cgit v1.2.3