diff options
Diffstat (limited to 'opendc-web/opendc-web-runner')
6 files changed, 911 insertions, 0 deletions
diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts new file mode 100644 index 00000000..fcc78a83 --- /dev/null +++ b/opendc-web/opendc-web-runner/build.gradle.kts @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Experiment runner for OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + application +} + +application { + mainClass.set("org.opendc.runner.web.MainKt") +} + +dependencies { + api(platform(project(":opendc-platform"))) + implementation(project(":opendc-compute:opendc-compute-simulator")) + implementation(project(":opendc-format")) + implementation(project(":opendc-experiments:opendc-experiments-capelin")) + implementation(project(":opendc-simulator:opendc-simulator-core")) + implementation(project(":opendc-simulator:opendc-simulator-compute")) + + implementation("io.github.microutils:kotlin-logging") + implementation("com.github.ajalt.clikt:clikt:${versions["clikt"]}") + implementation("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}") + implementation("io.sentry:sentry-log4j2:${versions["sentry-log4j2"]}") + implementation("org.mongodb:mongodb-driver-sync:${versions["mongodb-driver-sync"]}") + + runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}") + runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}") + + implementation(project(":opendc-telemetry:opendc-telemetry-sdk")) +} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt new file mode 100644 index 00000000..09f7de35 --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -0,0 +1,380 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.runner.web + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.options.* +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.int +import com.github.ajalt.clikt.parameters.types.long +import com.mongodb.MongoClientSettings +import com.mongodb.MongoCredential +import com.mongodb.ServerAddress +import com.mongodb.client.MongoClients +import com.mongodb.client.MongoDatabase +import com.mongodb.client.model.Filters +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import mu.KotlinLogging +import org.bson.Document +import org.bson.types.ObjectId +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.weights.* +import org.opendc.experiments.capelin.* +import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader +import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader +import org.opendc.format.environment.EnvironmentReader +import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.sdk.toOtelClock +import java.io.File +import kotlin.random.Random + +private val logger = KotlinLogging.logger {} + +/** + * Represents the CLI command for starting the OpenDC web runner. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public class RunnerCli : CliktCommand(name = "runner") { + /** + * The name of the database to use. + */ + private val mongoDb by option( + "--mongo-db", + help = "name of the database to use", + envvar = "OPENDC_DB" + ) + .default("opendc") + + /** + * The database host to connect to. + */ + private val mongoHost by option( + "--mongo-host", + help = "database host to connect to", + envvar = "OPENDC_DB_HOST" + ) + .default("localhost") + + /** + * The database port to connect to. + */ + private val mongoPort by option( + "--mongo-port", + help = "database port to connect to", + envvar = "OPENDC_DB_PORT" + ) + .int() + .default(27017) + + /** + * The database user to connect with. + */ + private val mongoUser by option( + "--mongo-user", + help = "database user to connect with", + envvar = "OPENDC_DB_USER" + ) + .default("opendc") + + /** + * The database password to connect with. + */ + private val mongoPassword by option( + "--mongo-password", + help = "database password to connect with", + envvar = "OPENDC_DB_PASSWORD" + ) + .convert { it.toCharArray() } + .required() + + /** + * The path to the traces directory. + */ + private val tracePath by option( + "--traces", + help = "path to the directory containing the traces", + envvar = "OPENDC_TRACES" + ) + .file(canBeFile = false) + .defaultLazy { File("traces/") } + + /** + * The maximum duration of a single experiment run. + */ + private val runTimeout by option( + "--run-timeout", + help = "maximum duration of experiment in seconds", + envvar = "OPENDC_RUN_TIMEOUT" + ) + .long() + .default(60 * 3) // Experiment may run for a maximum of three minutes + + /** + * Connect to the user-specified database. + */ + private fun createDatabase(): MongoDatabase { + val credential = MongoCredential.createScramSha1Credential( + mongoUser, + mongoDb, + mongoPassword + ) + + val settings = MongoClientSettings.builder() + .credential(credential) + .applyToClusterSettings { it.hosts(listOf(ServerAddress(mongoHost, mongoPort))) } + .build() + val client = MongoClients.create(settings) + return client.getDatabase(mongoDb) + } + + /** + * Run a single scenario. + */ + private suspend fun runScenario(portfolio: Document, scenario: Document, topologyParser: TopologyParser): List<WebExperimentMonitor.Result> { + val id = scenario.getObjectId("_id") + + logger.info { "Constructing performance interference model" } + + val traceDir = File( + tracePath, + scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) + ) + val traceReader = Sc20RawParquetTraceReader(traceDir) + val performanceInterferenceReader = let { + val path = File(traceDir, "performance-interference-model.json") + val operational = scenario.get("operational", Document::class.java) + val enabled = operational.getBoolean("performanceInterferenceEnabled") + + if (!enabled || !path.exists()) { + return@let null + } + + path.inputStream().use { Sc20PerformanceInterferenceReader(it) } + } + + val targets = portfolio.get("targets", Document::class.java) + val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) + val environment = topologyParser.read(topologyId) + + val results = (0 until targets.getInteger("repeatsPerScenario")).map { + logger.info { "Starting repeat $it" } + withTimeout(runTimeout * 1000) { + runRepeat(scenario, it, environment, traceReader, performanceInterferenceReader) + } + } + + logger.info { "Finished simulation for scenario $id" } + + return results + } + + /** + * Run a single repeat. + */ + private suspend fun runRepeat( + scenario: Document, + repeat: Int, + environment: EnvironmentReader, + traceReader: Sc20RawParquetTraceReader, + performanceInterferenceReader: Sc20PerformanceInterferenceReader? + ): WebExperimentMonitor.Result { + val monitor = WebExperimentMonitor() + + try { + runBlockingSimulation { + val seed = repeat + val traceDocument = scenario.get("trace", Document::class.java) + val workloadName = traceDocument.getString("traceId") + val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() + + val seeder = Random(seed) + + val chan = Channel<Unit>(Channel.CONFLATED) + + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + val metricProducer = meterProvider as MetricProducer + + val operational = scenario.get("operational", Document::class.java) + val allocationPolicy = + when (val policyName = operational.getString("schedulerName")) { + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(MemoryWeigher() to -1.0) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(MemoryWeigher() to 1.0) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(CoreMemoryWeigher() to -1.0) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(CoreMemoryWeigher() to 1.0) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(ProvisionedCoresWeigher() to -1.0) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(InstanceCountWeigher() to 1.0) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(ProvisionedCoresWeigher() to -1.0) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(ProvisionedCoresWeigher() to 1.0) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(RandomWeigher(java.util.Random(seeder.nextLong())) to 1.0) + ) + else -> throw IllegalArgumentException("Unknown policy $policyName") + } + + val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() + val trace = Sc20ParquetTraceReader( + listOf(traceReader), + performanceInterferenceModel, + Workload(workloadName, workloadFraction), + seed + ) + val failureFrequency = if (operational.getBoolean("failuresEnabled", false)) 24.0 * 7 else 0.0 + + withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler -> + val failureDomain = if (failureFrequency > 0) { + logger.debug { "ENABLING failures" } + createFailureDomain( + this, + clock, + seeder.nextInt(), + failureFrequency, + scheduler, + chan + ) + } else { + null + } + + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + trace, + scheduler, + chan, + monitor + ) + } + + failureDomain?.cancel() + } + + val monitorResults = collectMetrics(metricProducer) + logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } + } + } catch (cause: Throwable) { + logger.warn(cause) { "Experiment failed" } + } + + return monitor.getResult() + } + + private val POLL_INTERVAL = 5000L // ms = 5 s + private val HEARTBEAT_INTERVAL = 60000L // ms = 1 min + + override fun run(): Unit = runBlocking(Dispatchers.Default) { + logger.info { "Starting OpenDC web runner" } + logger.info { "Connecting to MongoDB instance" } + val database = createDatabase() + val manager = ScenarioManager(database.getCollection("scenarios")) + val portfolios = database.getCollection("portfolios") + val topologies = database.getCollection("topologies") + val topologyParser = TopologyParser(topologies) + + logger.info { "Watching for queued scenarios" } + + while (true) { + val scenario = manager.findNext() + + if (scenario == null) { + delay(POLL_INTERVAL) + continue + } + + val id = scenario.getObjectId("_id") + + logger.info { "Found queued scenario $id: attempting to claim" } + + if (!manager.claim(id)) { + logger.info { "Failed to claim scenario" } + continue + } + + coroutineScope { + // Launch heartbeat process + val heartbeat = launch { + while (true) { + delay(HEARTBEAT_INTERVAL) + manager.heartbeat(id) + } + } + + try { + val portfolio = portfolios.find(Filters.eq("_id", scenario.getObjectId("portfolioId"))).first()!! + val results = runScenario(portfolio, scenario, topologyParser) + + logger.info { "Writing results to database" } + + manager.finish(id, results) + + logger.info { "Successfully finished scenario $id" } + } catch (e: Exception) { + logger.error(e) { "Scenario failed to finish" } + manager.fail(id) + } finally { + heartbeat.cancel() + } + } + } + } +} + +/** + * Main entry point of the runner. + */ +public fun main(args: Array<String>): Unit = RunnerCli().main(args) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt new file mode 100644 index 00000000..a3907051 --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.runner.web + +import com.mongodb.client.MongoCollection +import com.mongodb.client.model.Filters +import com.mongodb.client.model.Updates +import org.bson.Document +import org.bson.types.ObjectId +import java.time.Instant + +/** + * Manages the queue of scenarios that need to be processed. + */ +public class ScenarioManager(private val collection: MongoCollection<Document>) { + /** + * Find the next scenario that the simulator needs to process. + */ + public fun findNext(): Document? { + return collection + .find(Filters.eq("simulation.state", "QUEUED")) + .first() + } + + /** + * Claim the scenario in the database with the specified id. + */ + public fun claim(id: ObjectId): Boolean { + val res = collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "QUEUED") + ), + Updates.combine( + Updates.set("simulation.state", "RUNNING"), + Updates.set("simulation.heartbeat", Instant.now()) + ) + ) + return res != null + } + + /** + * Update the heartbeat of the specified scenario. + */ + public fun heartbeat(id: ObjectId) { + collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "RUNNING") + ), + Updates.set("simulation.heartbeat", Instant.now()) + ) + } + + /** + * Mark the scenario as failed. + */ + public fun fail(id: ObjectId) { + collection.findOneAndUpdate( + Filters.eq("_id", id), + Updates.combine( + Updates.set("simulation.state", "FAILED"), + Updates.set("simulation.heartbeat", Instant.now()) + ) + ) + } + + /** + * Persist the specified results. + */ + public fun finish(id: ObjectId, results: List<WebExperimentMonitor.Result>) { + collection.findOneAndUpdate( + Filters.eq("_id", id), + Updates.combine( + Updates.set("simulation.state", "FINISHED"), + Updates.unset("simulation.time"), + Updates.set("results.total_requested_burst", results.map { it.totalRequestedBurst }), + Updates.set("results.total_granted_burst", results.map { it.totalGrantedBurst }), + Updates.set("results.total_overcommitted_burst", results.map { it.totalOvercommittedBurst }), + Updates.set("results.total_interfered_burst", results.map { it.totalInterferedBurst }), + Updates.set("results.mean_cpu_usage", results.map { it.meanCpuUsage }), + Updates.set("results.mean_cpu_demand", results.map { it.meanCpuDemand }), + Updates.set("results.mean_num_deployed_images", results.map { it.meanNumDeployedImages }), + Updates.set("results.max_num_deployed_images", results.map { it.maxNumDeployedImages }), + Updates.set("results.total_power_draw", results.map { it.totalPowerDraw }), + Updates.set("results.total_failure_slices", results.map { it.totalFailureSlices }), + Updates.set("results.total_failure_vm_slices", results.map { it.totalFailureVmSlices }), + Updates.set("results.total_vms_submitted", results.map { it.totalVmsSubmitted }), + Updates.set("results.total_vms_queued", results.map { it.totalVmsQueued }), + Updates.set("results.total_vms_finished", results.map { it.totalVmsFinished }), + Updates.set("results.total_vms_failed", results.map { it.totalVmsFailed }) + ) + ) + } +} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt new file mode 100644 index 00000000..2dd63340 --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.runner.web + +import com.mongodb.client.AggregateIterable +import com.mongodb.client.MongoCollection +import com.mongodb.client.model.Aggregates +import com.mongodb.client.model.Field +import com.mongodb.client.model.Filters +import com.mongodb.client.model.Projections +import org.bson.Document +import org.bson.types.ObjectId +import org.opendc.format.environment.EnvironmentReader +import org.opendc.format.environment.MachineDef +import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.LinearPowerModel +import java.util.* + +/** + * A helper class that converts the MongoDB topology into an OpenDC environment. + */ +public class TopologyParser(private val collection: MongoCollection<Document>) { + + /** + * Parse the topology from the specified [id]. + */ + public fun read(id: ObjectId): EnvironmentReader { + val nodes = mutableListOf<MachineDef>() + val random = Random(0) + + for (machine in fetchMachines(id)) { + val clusterId = machine.get("rack_id").toString() + val position = machine.getInteger("position") + + val processors = machine.getList("cpus", Document::class.java).flatMap { cpu -> + val cores = cpu.getInteger("numberOfCores") + val speed = cpu.get("clockRateMhz", Number::class.java).toDouble() + // TODO Remove hardcoding of vendor + val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores) + List(cores) { coreId -> + ProcessingUnit(node, coreId, speed) + } + } + val memoryUnits = machine.getList("memories", Document::class.java).map { memory -> + MemoryUnit( + "Samsung", + memory.getString("name"), + memory.get("speedMbPerS", Number::class.java).toDouble(), + memory.get("sizeMb", Number::class.java).toLong() + ) + } + + val energyConsumptionW = machine.getList("cpus", Document::class.java).sumBy { it.getInteger("energyConsumptionW") }.toDouble() + + nodes.add( + MachineDef( + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$position", + mapOf("cluster" to clusterId), + SimMachineModel(processors, memoryUnits), + LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5) + ) + ) + } + + return object : EnvironmentReader { + override fun read(): List<MachineDef> = nodes + override fun close() {} + } + } + + /** + * Fetch the metadata of the topology. + */ + private fun fetchName(id: ObjectId): String { + return collection.aggregate( + listOf( + Aggregates.match(Filters.eq("_id", id)), + Aggregates.project(Projections.include("name")) + ) + ) + .first()!! + .getString("name") + } + + /** + * Fetch a topology from the database with the specified [id]. + */ + private fun fetchMachines(id: ObjectId): AggregateIterable<Document> { + return collection.aggregate( + listOf( + Aggregates.match(Filters.eq("_id", id)), + Aggregates.project(Projections.fields(Document("racks", "\$rooms.tiles.rack"))), + Aggregates.unwind("\$racks"), + Aggregates.unwind("\$racks"), + Aggregates.replaceRoot("\$racks"), + Aggregates.addFields(Field("machines.rack_id", "\$_id")), + Aggregates.unwind("\$machines"), + Aggregates.replaceRoot("\$machines") + ) + ) + } +} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt new file mode 100644 index 00000000..c913f82f --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.runner.web + +import mu.KotlinLogging +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostState +import org.opendc.experiments.capelin.monitor.ExperimentMonitor +import org.opendc.experiments.capelin.telemetry.HostEvent +import kotlin.math.max + +/** + * An [ExperimentMonitor] that tracks the aggregate metrics for each repeat. + */ +public class WebExperimentMonitor : ExperimentMonitor { + private val logger = KotlinLogging.logger {} + + override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} + + override fun reportHostStateChange(time: Long, host: Host, newState: HostState) { + logger.debug { "Host ${host.uid} changed state $newState [$time]" } + } + + override fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + powerDraw: Double, + numberOfDeployedImages: Int, + host: Host, + ) { + processHostEvent( + HostEvent( + time, + 5 * 60 * 1000L, + host, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + powerDraw, + host.model.cpuCount + ) + ) + } + + private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() + private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf() + + private fun processHostEvent(event: HostEvent) { + val slices = event.duration / SLICE_LENGTH + + hostAggregateMetrics = AggregateHostMetrics( + hostAggregateMetrics.totalRequestedBurst + event.requestedBurst, + hostAggregateMetrics.totalGrantedBurst + event.grantedBurst, + hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst, + hostAggregateMetrics.totalInterferedBurst + event.interferedBurst, + hostAggregateMetrics.totalPowerDraw + (event.duration * event.powerDraw) / 3600, + hostAggregateMetrics.totalFailureSlices + if (event.host.state != HostState.UP) slices else 0, + hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != HostState.UP) event.vmCount * slices else 0 + ) + + hostMetrics.compute(event.host) { _, prev -> + HostMetrics( + (event.cpuUsage.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0), + (event.cpuDemand.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + event.vmCount + (prev?.vmCount ?: 0), + 1 + (prev?.count ?: 0) + ) + } + } + + private val SLICE_LENGTH: Long = 5 * 60 * 1000 + + public data class AggregateHostMetrics( + val totalRequestedBurst: Long = 0, + val totalGrantedBurst: Long = 0, + val totalOvercommittedBurst: Long = 0, + val totalInterferedBurst: Long = 0, + val totalPowerDraw: Double = 0.0, + val totalFailureSlices: Long = 0, + val totalFailureVmSlices: Long = 0, + ) + + public data class HostMetrics( + val cpuUsage: Double, + val cpuDemand: Double, + val vmCount: Long, + val count: Long + ) + + private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics() + + override fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) { + provisionerMetrics = AggregateProvisionerMetrics( + max(totalVmCount, provisionerMetrics.vmTotalCount), + max(waitingVmCount, provisionerMetrics.vmWaitingCount), + max(activeVmCount, provisionerMetrics.vmActiveCount), + max(inactiveVmCount, provisionerMetrics.vmInactiveCount), + max(failedVmCount, provisionerMetrics.vmFailedCount), + ) + } + + public data class AggregateProvisionerMetrics( + val vmTotalCount: Int = 0, + val vmWaitingCount: Int = 0, + val vmActiveCount: Int = 0, + val vmInactiveCount: Int = 0, + val vmFailedCount: Int = 0 + ) + + override fun close() {} + + public fun getResult(): Result { + return Result( + hostAggregateMetrics.totalRequestedBurst, + hostAggregateMetrics.totalGrantedBurst, + hostAggregateMetrics.totalOvercommittedBurst, + hostAggregateMetrics.totalInterferedBurst, + hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), + hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), + hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.average(), + hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, + hostAggregateMetrics.totalPowerDraw, + hostAggregateMetrics.totalFailureSlices, + hostAggregateMetrics.totalFailureVmSlices, + provisionerMetrics.vmTotalCount, + provisionerMetrics.vmWaitingCount, + provisionerMetrics.vmInactiveCount, + provisionerMetrics.vmFailedCount, + ) + } + + public data class Result( + public val totalRequestedBurst: Long, + public val totalGrantedBurst: Long, + public val totalOvercommittedBurst: Long, + public val totalInterferedBurst: Long, + public val meanCpuUsage: Double, + public val meanCpuDemand: Double, + public val meanNumDeployedImages: Double, + public val maxNumDeployedImages: Double, + public val totalPowerDraw: Double, + public val totalFailureSlices: Long, + public val totalFailureVmSlices: Long, + public val totalVmsSubmitted: Int, + public val totalVmsQueued: Int, + public val totalVmsFinished: Int, + public val totalVmsFailed: Int + ) +} diff --git a/opendc-web/opendc-web-runner/src/main/resources/log4j2.xml b/opendc-web/opendc-web-runner/src/main/resources/log4j2.xml new file mode 100644 index 00000000..503bc5dc --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/resources/log4j2.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ MIT License + ~ + ~ Copyright (c) 2020 atlarge-research + ~ + ~ Permission is hereby granted, free of charge, to any person obtaining a copy + ~ of this software and associated documentation files (the "Software"), to deal + ~ in the Software without restriction, including without limitation the rights + ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + ~ copies of the Software, and to permit persons to whom the Software is + ~ furnished to do so, subject to the following conditions: + ~ + ~ The above copyright notice and this permission notice shall be included in all + ~ copies or substantial portions of the Software. + ~ + ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + ~ SOFTWARE. + --> + +<Configuration status="WARN" packages="org.apache.logging.log4j.core,io.sentry.log4j2"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/> + </Console> + + <Sentry name="Sentry" /> + </Appenders> + <Loggers> + <Logger name="org.opendc" level="warn" additivity="false"> + <AppenderRef ref="Console"/> + <AppenderRef ref="Sentry"/> + </Logger> + <Logger name="org.opendc.runner" level="info" additivity="false"> + <AppenderRef ref="Console"/> + <AppenderRef ref="Sentry"/> + </Logger> + <Root level="info"> + <AppenderRef level="error" ref="Console"/> + <AppenderRef ref="Sentry"/> + </Root> + </Loggers> +</Configuration> |
