summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-runner/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-web/opendc-web-runner/src')
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt380
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt115
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt126
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt189
-rw-r--r--opendc-web/opendc-web-runner/src/main/resources/log4j2.xml48
5 files changed, 858 insertions, 0 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt
new file mode 100644
index 00000000..09f7de35
--- /dev/null
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -0,0 +1,380 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.runner.web
+
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.options.*
+import com.github.ajalt.clikt.parameters.types.file
+import com.github.ajalt.clikt.parameters.types.int
+import com.github.ajalt.clikt.parameters.types.long
+import com.mongodb.MongoClientSettings
+import com.mongodb.MongoCredential
+import com.mongodb.ServerAddress
+import com.mongodb.client.MongoClients
+import com.mongodb.client.MongoDatabase
+import com.mongodb.client.model.Filters
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import mu.KotlinLogging
+import org.bson.Document
+import org.bson.types.ObjectId
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.weights.*
+import org.opendc.experiments.capelin.*
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
+import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
+import org.opendc.format.environment.EnvironmentReader
+import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.sdk.toOtelClock
+import java.io.File
+import kotlin.random.Random
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * Represents the CLI command for starting the OpenDC web runner.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public class RunnerCli : CliktCommand(name = "runner") {
+ /**
+ * The name of the database to use.
+ */
+ private val mongoDb by option(
+ "--mongo-db",
+ help = "name of the database to use",
+ envvar = "OPENDC_DB"
+ )
+ .default("opendc")
+
+ /**
+ * The database host to connect to.
+ */
+ private val mongoHost by option(
+ "--mongo-host",
+ help = "database host to connect to",
+ envvar = "OPENDC_DB_HOST"
+ )
+ .default("localhost")
+
+ /**
+ * The database port to connect to.
+ */
+ private val mongoPort by option(
+ "--mongo-port",
+ help = "database port to connect to",
+ envvar = "OPENDC_DB_PORT"
+ )
+ .int()
+ .default(27017)
+
+ /**
+ * The database user to connect with.
+ */
+ private val mongoUser by option(
+ "--mongo-user",
+ help = "database user to connect with",
+ envvar = "OPENDC_DB_USER"
+ )
+ .default("opendc")
+
+ /**
+ * The database password to connect with.
+ */
+ private val mongoPassword by option(
+ "--mongo-password",
+ help = "database password to connect with",
+ envvar = "OPENDC_DB_PASSWORD"
+ )
+ .convert { it.toCharArray() }
+ .required()
+
+ /**
+ * The path to the traces directory.
+ */
+ private val tracePath by option(
+ "--traces",
+ help = "path to the directory containing the traces",
+ envvar = "OPENDC_TRACES"
+ )
+ .file(canBeFile = false)
+ .defaultLazy { File("traces/") }
+
+ /**
+ * The maximum duration of a single experiment run.
+ */
+ private val runTimeout by option(
+ "--run-timeout",
+ help = "maximum duration of experiment in seconds",
+ envvar = "OPENDC_RUN_TIMEOUT"
+ )
+ .long()
+ .default(60 * 3) // Experiment may run for a maximum of three minutes
+
+ /**
+ * Connect to the user-specified database.
+ */
+ private fun createDatabase(): MongoDatabase {
+ val credential = MongoCredential.createScramSha1Credential(
+ mongoUser,
+ mongoDb,
+ mongoPassword
+ )
+
+ val settings = MongoClientSettings.builder()
+ .credential(credential)
+ .applyToClusterSettings { it.hosts(listOf(ServerAddress(mongoHost, mongoPort))) }
+ .build()
+ val client = MongoClients.create(settings)
+ return client.getDatabase(mongoDb)
+ }
+
+ /**
+ * Run a single scenario.
+ */
+ private suspend fun runScenario(portfolio: Document, scenario: Document, topologyParser: TopologyParser): List<WebExperimentMonitor.Result> {
+ val id = scenario.getObjectId("_id")
+
+ logger.info { "Constructing performance interference model" }
+
+ val traceDir = File(
+ tracePath,
+ scenario.getEmbedded(listOf("trace", "traceId"), String::class.java)
+ )
+ val traceReader = Sc20RawParquetTraceReader(traceDir)
+ val performanceInterferenceReader = let {
+ val path = File(traceDir, "performance-interference-model.json")
+ val operational = scenario.get("operational", Document::class.java)
+ val enabled = operational.getBoolean("performanceInterferenceEnabled")
+
+ if (!enabled || !path.exists()) {
+ return@let null
+ }
+
+ path.inputStream().use { Sc20PerformanceInterferenceReader(it) }
+ }
+
+ val targets = portfolio.get("targets", Document::class.java)
+ val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
+ val environment = topologyParser.read(topologyId)
+
+ val results = (0 until targets.getInteger("repeatsPerScenario")).map {
+ logger.info { "Starting repeat $it" }
+ withTimeout(runTimeout * 1000) {
+ runRepeat(scenario, it, environment, traceReader, performanceInterferenceReader)
+ }
+ }
+
+ logger.info { "Finished simulation for scenario $id" }
+
+ return results
+ }
+
+ /**
+ * Run a single repeat.
+ */
+ private suspend fun runRepeat(
+ scenario: Document,
+ repeat: Int,
+ environment: EnvironmentReader,
+ traceReader: Sc20RawParquetTraceReader,
+ performanceInterferenceReader: Sc20PerformanceInterferenceReader?
+ ): WebExperimentMonitor.Result {
+ val monitor = WebExperimentMonitor()
+
+ try {
+ runBlockingSimulation {
+ val seed = repeat
+ val traceDocument = scenario.get("trace", Document::class.java)
+ val workloadName = traceDocument.getString("traceId")
+ val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
+
+ val seeder = Random(seed)
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+ val metricProducer = meterProvider as MetricProducer
+
+ val operational = scenario.get("operational", Document::class.java)
+ val allocationPolicy =
+ when (val policyName = operational.getString("schedulerName")) {
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(MemoryWeigher() to -1.0)
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(MemoryWeigher() to 1.0)
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to 1.0)
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(InstanceCountWeigher() to 1.0)
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to 1.0)
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(RandomWeigher(java.util.Random(seeder.nextLong())) to 1.0)
+ )
+ else -> throw IllegalArgumentException("Unknown policy $policyName")
+ }
+
+ val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap()
+ val trace = Sc20ParquetTraceReader(
+ listOf(traceReader),
+ performanceInterferenceModel,
+ Workload(workloadName, workloadFraction),
+ seed
+ )
+ val failureFrequency = if (operational.getBoolean("failuresEnabled", false)) 24.0 * 7 else 0.0
+
+ withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
+ val failureDomain = if (failureFrequency > 0) {
+ logger.debug { "ENABLING failures" }
+ createFailureDomain(
+ this,
+ clock,
+ seeder.nextInt(),
+ failureFrequency,
+ scheduler,
+ chan
+ )
+ } else {
+ null
+ }
+
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ trace,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
+
+ failureDomain?.cancel()
+ }
+
+ val monitorResults = collectMetrics(metricProducer)
+ logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
+ }
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Experiment failed" }
+ }
+
+ return monitor.getResult()
+ }
+
+ private val POLL_INTERVAL = 5000L // ms = 5 s
+ private val HEARTBEAT_INTERVAL = 60000L // ms = 1 min
+
+ override fun run(): Unit = runBlocking(Dispatchers.Default) {
+ logger.info { "Starting OpenDC web runner" }
+ logger.info { "Connecting to MongoDB instance" }
+ val database = createDatabase()
+ val manager = ScenarioManager(database.getCollection("scenarios"))
+ val portfolios = database.getCollection("portfolios")
+ val topologies = database.getCollection("topologies")
+ val topologyParser = TopologyParser(topologies)
+
+ logger.info { "Watching for queued scenarios" }
+
+ while (true) {
+ val scenario = manager.findNext()
+
+ if (scenario == null) {
+ delay(POLL_INTERVAL)
+ continue
+ }
+
+ val id = scenario.getObjectId("_id")
+
+ logger.info { "Found queued scenario $id: attempting to claim" }
+
+ if (!manager.claim(id)) {
+ logger.info { "Failed to claim scenario" }
+ continue
+ }
+
+ coroutineScope {
+ // Launch heartbeat process
+ val heartbeat = launch {
+ while (true) {
+ delay(HEARTBEAT_INTERVAL)
+ manager.heartbeat(id)
+ }
+ }
+
+ try {
+ val portfolio = portfolios.find(Filters.eq("_id", scenario.getObjectId("portfolioId"))).first()!!
+ val results = runScenario(portfolio, scenario, topologyParser)
+
+ logger.info { "Writing results to database" }
+
+ manager.finish(id, results)
+
+ logger.info { "Successfully finished scenario $id" }
+ } catch (e: Exception) {
+ logger.error(e) { "Scenario failed to finish" }
+ manager.fail(id)
+ } finally {
+ heartbeat.cancel()
+ }
+ }
+ }
+ }
+}
+
+/**
+ * Main entry point of the runner.
+ */
+public fun main(args: Array<String>): Unit = RunnerCli().main(args)
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt
new file mode 100644
index 00000000..a3907051
--- /dev/null
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt
@@ -0,0 +1,115 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.runner.web
+
+import com.mongodb.client.MongoCollection
+import com.mongodb.client.model.Filters
+import com.mongodb.client.model.Updates
+import org.bson.Document
+import org.bson.types.ObjectId
+import java.time.Instant
+
+/**
+ * Manages the queue of scenarios that need to be processed.
+ */
+public class ScenarioManager(private val collection: MongoCollection<Document>) {
+ /**
+ * Find the next scenario that the simulator needs to process.
+ */
+ public fun findNext(): Document? {
+ return collection
+ .find(Filters.eq("simulation.state", "QUEUED"))
+ .first()
+ }
+
+ /**
+ * Claim the scenario in the database with the specified id.
+ */
+ public fun claim(id: ObjectId): Boolean {
+ val res = collection.findOneAndUpdate(
+ Filters.and(
+ Filters.eq("_id", id),
+ Filters.eq("simulation.state", "QUEUED")
+ ),
+ Updates.combine(
+ Updates.set("simulation.state", "RUNNING"),
+ Updates.set("simulation.heartbeat", Instant.now())
+ )
+ )
+ return res != null
+ }
+
+ /**
+ * Update the heartbeat of the specified scenario.
+ */
+ public fun heartbeat(id: ObjectId) {
+ collection.findOneAndUpdate(
+ Filters.and(
+ Filters.eq("_id", id),
+ Filters.eq("simulation.state", "RUNNING")
+ ),
+ Updates.set("simulation.heartbeat", Instant.now())
+ )
+ }
+
+ /**
+ * Mark the scenario as failed.
+ */
+ public fun fail(id: ObjectId) {
+ collection.findOneAndUpdate(
+ Filters.eq("_id", id),
+ Updates.combine(
+ Updates.set("simulation.state", "FAILED"),
+ Updates.set("simulation.heartbeat", Instant.now())
+ )
+ )
+ }
+
+ /**
+ * Persist the specified results.
+ */
+ public fun finish(id: ObjectId, results: List<WebExperimentMonitor.Result>) {
+ collection.findOneAndUpdate(
+ Filters.eq("_id", id),
+ Updates.combine(
+ Updates.set("simulation.state", "FINISHED"),
+ Updates.unset("simulation.time"),
+ Updates.set("results.total_requested_burst", results.map { it.totalRequestedBurst }),
+ Updates.set("results.total_granted_burst", results.map { it.totalGrantedBurst }),
+ Updates.set("results.total_overcommitted_burst", results.map { it.totalOvercommittedBurst }),
+ Updates.set("results.total_interfered_burst", results.map { it.totalInterferedBurst }),
+ Updates.set("results.mean_cpu_usage", results.map { it.meanCpuUsage }),
+ Updates.set("results.mean_cpu_demand", results.map { it.meanCpuDemand }),
+ Updates.set("results.mean_num_deployed_images", results.map { it.meanNumDeployedImages }),
+ Updates.set("results.max_num_deployed_images", results.map { it.maxNumDeployedImages }),
+ Updates.set("results.total_power_draw", results.map { it.totalPowerDraw }),
+ Updates.set("results.total_failure_slices", results.map { it.totalFailureSlices }),
+ Updates.set("results.total_failure_vm_slices", results.map { it.totalFailureVmSlices }),
+ Updates.set("results.total_vms_submitted", results.map { it.totalVmsSubmitted }),
+ Updates.set("results.total_vms_queued", results.map { it.totalVmsQueued }),
+ Updates.set("results.total_vms_finished", results.map { it.totalVmsFinished }),
+ Updates.set("results.total_vms_failed", results.map { it.totalVmsFailed })
+ )
+ )
+ }
+}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
new file mode 100644
index 00000000..2dd63340
--- /dev/null
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
@@ -0,0 +1,126 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.runner.web
+
+import com.mongodb.client.AggregateIterable
+import com.mongodb.client.MongoCollection
+import com.mongodb.client.model.Aggregates
+import com.mongodb.client.model.Field
+import com.mongodb.client.model.Filters
+import com.mongodb.client.model.Projections
+import org.bson.Document
+import org.bson.types.ObjectId
+import org.opendc.format.environment.EnvironmentReader
+import org.opendc.format.environment.MachineDef
+import org.opendc.simulator.compute.SimMachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.LinearPowerModel
+import java.util.*
+
+/**
+ * A helper class that converts the MongoDB topology into an OpenDC environment.
+ */
+public class TopologyParser(private val collection: MongoCollection<Document>) {
+
+ /**
+ * Parse the topology from the specified [id].
+ */
+ public fun read(id: ObjectId): EnvironmentReader {
+ val nodes = mutableListOf<MachineDef>()
+ val random = Random(0)
+
+ for (machine in fetchMachines(id)) {
+ val clusterId = machine.get("rack_id").toString()
+ val position = machine.getInteger("position")
+
+ val processors = machine.getList("cpus", Document::class.java).flatMap { cpu ->
+ val cores = cpu.getInteger("numberOfCores")
+ val speed = cpu.get("clockRateMhz", Number::class.java).toDouble()
+ // TODO Remove hardcoding of vendor
+ val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores)
+ List(cores) { coreId ->
+ ProcessingUnit(node, coreId, speed)
+ }
+ }
+ val memoryUnits = machine.getList("memories", Document::class.java).map { memory ->
+ MemoryUnit(
+ "Samsung",
+ memory.getString("name"),
+ memory.get("speedMbPerS", Number::class.java).toDouble(),
+ memory.get("sizeMb", Number::class.java).toLong()
+ )
+ }
+
+ val energyConsumptionW = machine.getList("cpus", Document::class.java).sumBy { it.getInteger("energyConsumptionW") }.toDouble()
+
+ nodes.add(
+ MachineDef(
+ UUID(random.nextLong(), random.nextLong()),
+ "node-$clusterId-$position",
+ mapOf("cluster" to clusterId),
+ SimMachineModel(processors, memoryUnits),
+ LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5)
+ )
+ )
+ }
+
+ return object : EnvironmentReader {
+ override fun read(): List<MachineDef> = nodes
+ override fun close() {}
+ }
+ }
+
+ /**
+ * Fetch the metadata of the topology.
+ */
+ private fun fetchName(id: ObjectId): String {
+ return collection.aggregate(
+ listOf(
+ Aggregates.match(Filters.eq("_id", id)),
+ Aggregates.project(Projections.include("name"))
+ )
+ )
+ .first()!!
+ .getString("name")
+ }
+
+ /**
+ * Fetch a topology from the database with the specified [id].
+ */
+ private fun fetchMachines(id: ObjectId): AggregateIterable<Document> {
+ return collection.aggregate(
+ listOf(
+ Aggregates.match(Filters.eq("_id", id)),
+ Aggregates.project(Projections.fields(Document("racks", "\$rooms.tiles.rack"))),
+ Aggregates.unwind("\$racks"),
+ Aggregates.unwind("\$racks"),
+ Aggregates.replaceRoot("\$racks"),
+ Aggregates.addFields(Field("machines.rack_id", "\$_id")),
+ Aggregates.unwind("\$machines"),
+ Aggregates.replaceRoot("\$machines")
+ )
+ )
+ }
+}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
new file mode 100644
index 00000000..c913f82f
--- /dev/null
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
@@ -0,0 +1,189 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.runner.web
+
+import mu.KotlinLogging
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostState
+import org.opendc.experiments.capelin.monitor.ExperimentMonitor
+import org.opendc.experiments.capelin.telemetry.HostEvent
+import kotlin.math.max
+
+/**
+ * An [ExperimentMonitor] that tracks the aggregate metrics for each repeat.
+ */
+public class WebExperimentMonitor : ExperimentMonitor {
+ private val logger = KotlinLogging.logger {}
+
+ override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
+
+ override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
+ logger.debug { "Host ${host.uid} changed state $newState [$time]" }
+ }
+
+ override fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ powerDraw: Double,
+ numberOfDeployedImages: Int,
+ host: Host,
+ ) {
+ processHostEvent(
+ HostEvent(
+ time,
+ 5 * 60 * 1000L,
+ host,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ powerDraw,
+ host.model.cpuCount
+ )
+ )
+ }
+
+ private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
+ private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf()
+
+ private fun processHostEvent(event: HostEvent) {
+ val slices = event.duration / SLICE_LENGTH
+
+ hostAggregateMetrics = AggregateHostMetrics(
+ hostAggregateMetrics.totalRequestedBurst + event.requestedBurst,
+ hostAggregateMetrics.totalGrantedBurst + event.grantedBurst,
+ hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst,
+ hostAggregateMetrics.totalInterferedBurst + event.interferedBurst,
+ hostAggregateMetrics.totalPowerDraw + (event.duration * event.powerDraw) / 3600,
+ hostAggregateMetrics.totalFailureSlices + if (event.host.state != HostState.UP) slices else 0,
+ hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != HostState.UP) event.vmCount * slices else 0
+ )
+
+ hostMetrics.compute(event.host) { _, prev ->
+ HostMetrics(
+ (event.cpuUsage.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
+ (event.cpuDemand.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
+ event.vmCount + (prev?.vmCount ?: 0),
+ 1 + (prev?.count ?: 0)
+ )
+ }
+ }
+
+ private val SLICE_LENGTH: Long = 5 * 60 * 1000
+
+ public data class AggregateHostMetrics(
+ val totalRequestedBurst: Long = 0,
+ val totalGrantedBurst: Long = 0,
+ val totalOvercommittedBurst: Long = 0,
+ val totalInterferedBurst: Long = 0,
+ val totalPowerDraw: Double = 0.0,
+ val totalFailureSlices: Long = 0,
+ val totalFailureVmSlices: Long = 0,
+ )
+
+ public data class HostMetrics(
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val vmCount: Long,
+ val count: Long
+ )
+
+ private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics()
+
+ override fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {
+ provisionerMetrics = AggregateProvisionerMetrics(
+ max(totalVmCount, provisionerMetrics.vmTotalCount),
+ max(waitingVmCount, provisionerMetrics.vmWaitingCount),
+ max(activeVmCount, provisionerMetrics.vmActiveCount),
+ max(inactiveVmCount, provisionerMetrics.vmInactiveCount),
+ max(failedVmCount, provisionerMetrics.vmFailedCount),
+ )
+ }
+
+ public data class AggregateProvisionerMetrics(
+ val vmTotalCount: Int = 0,
+ val vmWaitingCount: Int = 0,
+ val vmActiveCount: Int = 0,
+ val vmInactiveCount: Int = 0,
+ val vmFailedCount: Int = 0
+ )
+
+ override fun close() {}
+
+ public fun getResult(): Result {
+ return Result(
+ hostAggregateMetrics.totalRequestedBurst,
+ hostAggregateMetrics.totalGrantedBurst,
+ hostAggregateMetrics.totalOvercommittedBurst,
+ hostAggregateMetrics.totalInterferedBurst,
+ hostMetrics.map { it.value.cpuUsage / it.value.count }.average(),
+ hostMetrics.map { it.value.cpuDemand / it.value.count }.average(),
+ hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.average(),
+ hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0,
+ hostAggregateMetrics.totalPowerDraw,
+ hostAggregateMetrics.totalFailureSlices,
+ hostAggregateMetrics.totalFailureVmSlices,
+ provisionerMetrics.vmTotalCount,
+ provisionerMetrics.vmWaitingCount,
+ provisionerMetrics.vmInactiveCount,
+ provisionerMetrics.vmFailedCount,
+ )
+ }
+
+ public data class Result(
+ public val totalRequestedBurst: Long,
+ public val totalGrantedBurst: Long,
+ public val totalOvercommittedBurst: Long,
+ public val totalInterferedBurst: Long,
+ public val meanCpuUsage: Double,
+ public val meanCpuDemand: Double,
+ public val meanNumDeployedImages: Double,
+ public val maxNumDeployedImages: Double,
+ public val totalPowerDraw: Double,
+ public val totalFailureSlices: Long,
+ public val totalFailureVmSlices: Long,
+ public val totalVmsSubmitted: Int,
+ public val totalVmsQueued: Int,
+ public val totalVmsFinished: Int,
+ public val totalVmsFailed: Int
+ )
+}
diff --git a/opendc-web/opendc-web-runner/src/main/resources/log4j2.xml b/opendc-web/opendc-web-runner/src/main/resources/log4j2.xml
new file mode 100644
index 00000000..503bc5dc
--- /dev/null
+++ b/opendc-web/opendc-web-runner/src/main/resources/log4j2.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ MIT License
+ ~
+ ~ Copyright (c) 2020 atlarge-research
+ ~
+ ~ Permission is hereby granted, free of charge, to any person obtaining a copy
+ ~ of this software and associated documentation files (the "Software"), to deal
+ ~ in the Software without restriction, including without limitation the rights
+ ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ ~ copies of the Software, and to permit persons to whom the Software is
+ ~ furnished to do so, subject to the following conditions:
+ ~
+ ~ The above copyright notice and this permission notice shall be included in all
+ ~ copies or substantial portions of the Software.
+ ~
+ ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ ~ SOFTWARE.
+ -->
+
+<Configuration status="WARN" packages="org.apache.logging.log4j.core,io.sentry.log4j2">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/>
+ </Console>
+
+ <Sentry name="Sentry" />
+ </Appenders>
+ <Loggers>
+ <Logger name="org.opendc" level="warn" additivity="false">
+ <AppenderRef ref="Console"/>
+ <AppenderRef ref="Sentry"/>
+ </Logger>
+ <Logger name="org.opendc.runner" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ <AppenderRef ref="Sentry"/>
+ </Logger>
+ <Root level="info">
+ <AppenderRef level="error" ref="Console"/>
+ <AppenderRef ref="Sentry"/>
+ </Root>
+ </Loggers>
+</Configuration>