summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-runner/src
diff options
context:
space:
mode:
authormjkwiatkowski <mati.rewa@gmail.com>2026-02-16 15:18:21 +0100
committermjkwiatkowski <mati.rewa@gmail.com>2026-02-16 15:18:21 +0100
commit2f16cb0f48eca4453e3e894b3d45a3aa09e6dcc0 (patch)
tree672d98baa2ac071f2c30de06d613254d0d8cd105 /opendc-web/opendc-web-runner/src
parent86d35fcec83057e346e4982b5a6908f25342a392 (diff)
feat: opendc -> kafka -> postgresql works; added protobuf encodingHEADmaster
Diffstat (limited to 'opendc-web/opendc-web-runner/src')
-rw-r--r--opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt132
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt82
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt385
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt69
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt134
-rw-r--r--opendc-web/opendc-web-runner/src/main/resources/log4j2.xml48
6 files changed, 0 insertions, 850 deletions
diff --git a/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt
deleted file mode 100644
index 5d35fd98..00000000
--- a/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.web.runner
-
-import com.github.ajalt.clikt.core.CliktCommand
-import com.github.ajalt.clikt.parameters.options.convert
-import com.github.ajalt.clikt.parameters.options.default
-import com.github.ajalt.clikt.parameters.options.defaultLazy
-import com.github.ajalt.clikt.parameters.options.option
-import com.github.ajalt.clikt.parameters.options.required
-import com.github.ajalt.clikt.parameters.types.file
-import com.github.ajalt.clikt.parameters.types.int
-import mu.KotlinLogging
-import org.opendc.web.client.auth.OpenIdAuthController
-import org.opendc.web.client.runner.OpenDCRunnerClient
-import java.io.File
-import java.net.URI
-
-private val logger = KotlinLogging.logger {}
-
-/**
- * Represents the CLI command for starting the OpenDC web runner.
- */
-class RunnerCli : CliktCommand(name = "opendc-runner") {
- /**
- * The URL to the OpenDC API.
- */
- private val apiUrl by option(
- "--api-url",
- help = "url to the OpenDC API",
- envvar = "OPENDC_API_URL",
- )
- .convert { URI(it) }
- .default(URI("https://api.opendc.org/v2"))
-
- /**
- * The auth domain to use.
- */
- private val authDomain by option(
- "--auth-domain",
- help = "auth domain of the OpenDC API",
- envvar = "AUTH0_DOMAIN",
- )
- .required()
-
- /**
- * The auth domain to use.
- */
- private val authAudience by option(
- "--auth-audience",
- help = "auth audience of the OpenDC API",
- envvar = "AUTH0_AUDIENCE",
- )
- .required()
-
- /**
- * The auth client ID to use.
- */
- private val authClientId by option(
- "--auth-id",
- help = "auth client id of the OpenDC API",
- envvar = "AUTH0_CLIENT_ID",
- )
- .required()
-
- /**
- * The auth client secret to use.
- */
- private val authClientSecret by option(
- "--auth-secret",
- help = "auth client secret of the OpenDC API",
- envvar = "AUTH0_CLIENT_SECRET",
- )
- .required()
-
- /**
- * The path to the traces directory.
- */
- private val tracePath by option(
- "--traces",
- help = "path to the directory containing the traces",
- envvar = "OPENDC_TRACES",
- )
- .file(canBeFile = false)
- .defaultLazy { File("traces/") }
-
- /**
- * The number of threads used for simulations..
- */
- private val parallelism by option(
- "--parallelism",
- help = "maximum number of threads for simulations",
- )
- .int()
- .default(Runtime.getRuntime().availableProcessors() - 1)
-
- override fun run() {
- logger.info { "Starting OpenDC web runner" }
-
- val client = OpenDCRunnerClient(baseUrl = apiUrl, OpenIdAuthController(authDomain, authClientId, authClientSecret, authAudience))
- val manager = JobManager(client)
- val runner = OpenDCRunner(manager, tracePath, parallelism = parallelism)
-
- logger.info { "Watching for queued scenarios" }
- runner.run()
- }
-}
-
-/**
- * Main entry point of the runner.
- */
-fun main(args: Array<String>): Unit = RunnerCli().main(args)
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt
deleted file mode 100644
index a517f3b4..00000000
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.web.runner
-
-import org.opendc.web.client.runner.OpenDCRunnerClient
-import org.opendc.web.proto.runner.Job
-import org.opendc.web.runner.internal.JobManagerImpl
-
-/**
- * Interface used by the [OpenDCRunner] to manage the available jobs to be processed.
- */
-public interface JobManager {
- /**
- * Find the next job that the simulator needs to process.
- */
- public fun findNext(): Job?
-
- /**
- * Claim the simulation job with the specified id.
- */
- public fun claim(id: Long): Boolean
-
- /**
- * Update the heartbeat of the specified job.
- *
- * @param id The identifier of the job.
- * @param runtime The total runtime of the job.
- * @return `true` if the job can continue, `false` if the job has been cancelled.
- */
- public fun heartbeat(
- id: Long,
- runtime: Int,
- ): Boolean
-
- /**
- * Mark the job as failed.
- */
- public fun fail(
- id: Long,
- runtime: Int,
- )
-
- /**
- * Persist the specified results for the specified job.
- */
- public fun finish(
- id: Long,
- runtime: Int,
- results: Map<String, Any>,
- )
-
- public companion object {
- /**
- * Create a [JobManager] given the specified [client].
- */
- @JvmStatic
- @JvmName("create")
- public operator fun invoke(client: OpenDCRunnerClient): JobManager {
- return JobManagerImpl(client)
- }
- }
-}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
deleted file mode 100644
index 83583eab..00000000
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.web.runner
-
-import mu.KotlinLogging
-import org.opendc.compute.failure.prefab.FailurePrefab
-import org.opendc.compute.failure.prefab.createFailureModelPrefab
-import org.opendc.compute.simulator.provisioner.Provisioner
-import org.opendc.compute.simulator.provisioner.registerComputeMonitor
-import org.opendc.compute.simulator.provisioner.setupComputeService
-import org.opendc.compute.simulator.provisioner.setupHosts
-import org.opendc.compute.simulator.scheduler.createPrefabComputeScheduler
-import org.opendc.compute.simulator.service.ComputeService
-import org.opendc.compute.topology.specs.ClusterSpec
-import org.opendc.compute.topology.specs.HostSpec
-import org.opendc.compute.topology.specs.PowerSourceSpec
-import org.opendc.compute.workload.ComputeWorkloadLoader
-import org.opendc.experiments.base.runner.replay
-import org.opendc.simulator.compute.models.CpuModel
-import org.opendc.simulator.compute.models.MachineModel
-import org.opendc.simulator.compute.models.MemoryUnit
-import org.opendc.simulator.compute.power.PowerModels
-import org.opendc.simulator.kotlin.runSimulation
-import org.opendc.web.proto.runner.Job
-import org.opendc.web.proto.runner.Scenario
-import org.opendc.web.proto.runner.Topology
-import org.opendc.web.runner.internal.WebComputeMonitor
-import java.io.File
-import java.time.Duration
-import java.time.Instant
-import java.time.temporal.ChronoUnit
-import java.util.Random
-import java.util.concurrent.Executors
-import java.util.concurrent.ForkJoinPool
-import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
-import java.util.concurrent.ForkJoinWorkerThread
-import java.util.concurrent.RecursiveAction
-import java.util.concurrent.RecursiveTask
-import java.util.concurrent.ScheduledExecutorService
-import java.util.concurrent.TimeUnit
-
-/**
- * Class to execute the pending jobs via the OpenDC web API.
- *
- * @param manager The underlying [JobManager] to manage the available jobs.
- * @param tracePath The directory where the traces are located.
- * @param jobTimeout The maximum duration of a simulation job.
- * @param pollInterval The interval to poll the API with.
- * @param heartbeatInterval The interval to send a heartbeat to the API server.
- */
-public class OpenDCRunner(
- private val manager: JobManager,
- private val tracePath: File,
- parallelism: Int = Runtime.getRuntime().availableProcessors(),
- private val jobTimeout: Duration = Duration.ofMinutes(10),
- private val pollInterval: Duration = Duration.ofSeconds(30),
- private val heartbeatInterval: Duration = Duration.ofMinutes(1),
-) : Runnable {
- /**
- * Logging instance for this runner.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * Helper class to load the workloads.
- */
- private val workloadLoader = ComputeWorkloadLoader(tracePath)
-
- /**
- * The [ForkJoinPool] that is used to execute the simulation jobs.
- */
- private val pool =
- ForkJoinPool(parallelism, RunnerThreadFactory(Thread.currentThread().contextClassLoader), null, false)
-
- /**
- * A [ScheduledExecutorService] to manage the heartbeat of simulation jobs as well as tracking the deadline of
- * individual simulations.
- */
- private val scheduler = Executors.newSingleThreadScheduledExecutor()
-
- /**
- * Start the runner process.
- *
- * This method will block until interrupted and poll the OpenDC API for new jobs to execute.
- */
- override fun run() {
- try {
- while (true) {
- val job = manager.findNext()
- if (job == null) {
- Thread.sleep(pollInterval.toMillis())
- continue
- }
-
- val id = job.id
-
- logger.info { "Found queued job $id: attempting to claim" }
-
- if (!manager.claim(id)) {
- logger.info { "Failed to claim scenario" }
- continue
- }
-
- pool.submit(JobAction(job))
- }
- } catch (_: InterruptedException) {
- // Gracefully exit when the thread is interrupted
- } finally {
- workloadLoader.reset()
-
- pool.shutdown()
- scheduler.shutdown()
-
- pool.awaitTermination(5, TimeUnit.SECONDS)
- }
- }
-
- /**
- * A [RecursiveAction] that runs a simulation job (consisting of possible multiple simulations).
- *
- * @param job The job to simulate.
- */
- private inner class JobAction(private val job: Job) : RecursiveAction() {
- override fun compute() {
- val id = job.id
- val scenario = job.scenario
- val startTime = Instant.now()
- val currentThread = Thread.currentThread()
-
- val heartbeat =
- scheduler.scheduleWithFixedDelay(
- {
- if (!manager.heartbeat(id, startTime.secondsSince())) {
- currentThread.interrupt()
- }
- },
- 0,
- heartbeatInterval.toMillis(),
- TimeUnit.MILLISECONDS,
- )
-
- try {
- val topology = convertTopology(scenario.topology)
- val jobs =
- (0 until scenario.portfolio.targets.repeats).map { repeat ->
- SimulationTask(
- scenario,
- repeat,
- topology,
- )
- }
- val results = invokeAll(jobs).map { it.rawResult }
-
- heartbeat.cancel(true)
-
- val duration = startTime.secondsSince()
- logger.info { "Finished simulation for job $id (in $duration seconds)" }
-
- manager.finish(
- id,
- duration,
- mapOf(
- "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime },
- "total_granted_burst" to results.map { it.totalActiveTime },
- "total_overcommitted_burst" to results.map { it.totalStealTime },
- "total_interfered_burst" to results.map { it.totalLostTime },
- "mean_cpu_usage" to results.map { it.meanCpuUsage },
- "mean_cpu_demand" to results.map { it.meanCpuDemand },
- "mean_num_deployed_images" to results.map { it.meanNumDeployedImages },
- "max_num_deployed_images" to results.map { it.maxNumDeployedImages },
- "total_power_draw" to results.map { it.totalPowerDraw },
- "total_failure_slices" to results.map { it.totalFailureSlices },
- "total_failure_vm_slices" to results.map { it.totalFailureVmSlices },
- "total_vms_submitted" to results.map { it.totalVmsSubmitted },
- "total_vms_queued" to results.map { it.totalVmsQueued },
- "total_vms_finished" to results.map { it.totalVmsFinished },
- "total_vms_failed" to results.map { it.totalVmsFailed },
- ),
- )
- } catch (e: Exception) {
- // Check whether the job failed due to exceeding its time budget
- if (Thread.interrupted()) {
- logger.info { "Simulation job $id exceeded time limit (${startTime.secondsSince()} seconds)" }
- } else {
- logger.info(e) { "Simulation job $id failed" }
- }
-
- try {
- heartbeat.cancel(true)
- manager.fail(id, startTime.secondsSince())
- } catch (e: Throwable) {
- logger.error(e) { "Failed to update job" }
- }
- }
- }
-
- /**
- * Calculate the seconds since the specified instant.
- */
- private fun Instant.secondsSince(): Int {
- return ChronoUnit.SECONDS.between(this, Instant.now()).toInt()
- }
- }
-
- /**
- * A [RecursiveTask] that simulates a single scenario.
- *
- * @param scenario The scenario to simulate.
- * @param repeat The repeat number used to seed the simulation.
- * @param topologyHosts The topology to simulate.
- */
- private inner class SimulationTask(
- private val scenario: Scenario,
- private val repeat: Int,
- private val topologyHosts: List<HostSpec>,
- ) : RecursiveTask<WebComputeMonitor.Results>() {
- override fun compute(): WebComputeMonitor.Results {
- val monitor = WebComputeMonitor()
-
- // Schedule task that interrupts the simulation if it runs for too long.
- val currentThread = Thread.currentThread()
- val interruptTask =
- scheduler.schedule({ currentThread.interrupt() }, jobTimeout.toMillis(), TimeUnit.MILLISECONDS)
-
- try {
- runSimulation(monitor)
- } finally {
- interruptTask.cancel(false)
- }
-
- return monitor.collectResults()
- }
-
- /**
- * Run a single simulation of the scenario.
- */
- private fun runSimulation(monitor: WebComputeMonitor) =
- runSimulation {
- val serviceDomain = "compute.opendc.org"
- val seed = repeat.toLong()
-
- val scenario = scenario
-
- val powerSourceSpec =
- PowerSourceSpec(
- totalPower = Long.MAX_VALUE,
- )
- val topology = listOf(ClusterSpec("cluster", topologyHosts, powerSourceSpec))
-
- Provisioner(dispatcher, seed).use { provisioner ->
-
-// val workload =
-// trace(scenario.workload.trace.id).sampleByLoad(scenario.workload.samplingFraction)
-// val vms = workload.resolve(workloadLoader, Random(seed))
-
- val vms = workloadLoader.sampleByLoad(scenario.workload.samplingFraction)
- val startTime = vms.minOf { it.submittedAt }
-
- provisioner.runSteps(
- setupComputeService(
- serviceDomain,
- { createPrefabComputeScheduler(scenario.schedulerName, Random(it.seeder.nextLong()), timeSource) },
- ),
- registerComputeMonitor(serviceDomain, monitor),
- setupHosts(serviceDomain, topology, startTime),
- )
-
- val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!!
-
- val phenomena = scenario.phenomena
- val failureModel =
- if (phenomena.failures) {
- createFailureModelPrefab(coroutineContext, timeSource, service, Random(seed), FailurePrefab.G5k06Exp)
- } else {
- null
- }
-
- // Run workload trace
- service.replay(timeSource, vms, seed = seed)
-
- val serviceMetrics = service.getSchedulerStats()
- logger.debug {
- "Scheduler " +
- "Success=${serviceMetrics.attemptsSuccess} " +
- "Failure=${serviceMetrics.attemptsFailure} " +
- "Pending=${serviceMetrics.tasksPending} " +
- "Active=${serviceMetrics.tasksActive}"
- }
- }
- }
- }
-
- /**
- * Convert the specified [topology] into an [Topology] understood by OpenDC.
- */
- private fun convertTopology(topology: Topology): List<HostSpec> {
- val res = mutableListOf<HostSpec>()
- val random = Random(0)
-
- val machines =
- topology.rooms.asSequence()
- .flatMap { room ->
- room.tiles.flatMap { tile ->
- val rack = tile.rack
- rack?.machines?.map { machine -> rack to machine } ?: emptyList()
- }
- }
-
- for ((rack, machine) in machines) {
- val clusterId = rack.id
- val position = machine.position
-
- val processors =
- machine.cpus.map { cpu ->
- CpuModel(
- 0,
- cpu.numberOfCores,
- cpu.clockRateMhz,
- "Intel",
- "amd64",
- cpu.name,
- )
- }
-
- val memoryUnits =
- machine.memory.map { memory ->
- MemoryUnit(
- "Samsung",
- memory.name,
- memory.speedMbPerS,
- memory.sizeMb.toLong(),
- )
- }
-
- val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW }
- val cpuPowerModel = PowerModels.linear(2 * energyConsumptionW, energyConsumptionW * 0.5)
-
- val spec =
- HostSpec(
- "node-$clusterId-$position",
- "node-$clusterId",
- clusterId,
- MachineModel(processors, memoryUnits[0]),
- cpuPowerModel,
- null,
- )
-
- res += spec
- }
-
- return res
- }
-
- /**
- * A custom [ForkJoinWorkerThreadFactory] that uses the [ClassLoader] of specified by the runner.
- */
- private class RunnerThreadFactory(private val classLoader: ClassLoader) : ForkJoinWorkerThreadFactory {
- override fun newThread(pool: ForkJoinPool): ForkJoinWorkerThread =
- object : ForkJoinWorkerThread(pool) {
- init {
- contextClassLoader = classLoader
- }
- }
- }
-}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt
deleted file mode 100644
index 7081041c..00000000
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.web.runner.internal
-
-import org.opendc.web.client.runner.OpenDCRunnerClient
-import org.opendc.web.proto.JobState
-import org.opendc.web.proto.runner.Job
-import org.opendc.web.runner.JobManager
-
-/**
- * Default implementation of [JobManager] that uses the OpenDC client to receive jobs.
- */
-internal class JobManagerImpl(private val client: OpenDCRunnerClient) : JobManager {
- override fun findNext(): Job? {
- return client.jobs.queryPending().firstOrNull()
- }
-
- override fun claim(id: Long): Boolean {
- return try {
- client.jobs.update(id, Job.Update(JobState.CLAIMED, 0))
- true
- } catch (e: IllegalStateException) {
- false
- }
- }
-
- override fun heartbeat(
- id: Long,
- runtime: Int,
- ): Boolean {
- val res = client.jobs.update(id, Job.Update(JobState.RUNNING, runtime))
- return res?.state != JobState.FAILED
- }
-
- override fun fail(
- id: Long,
- runtime: Int,
- ) {
- client.jobs.update(id, Job.Update(JobState.FAILED, runtime))
- }
-
- override fun finish(
- id: Long,
- runtime: Int,
- results: Map<String, Any>,
- ) {
- client.jobs.update(id, Job.Update(JobState.FINISHED, runtime))
- }
-}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
deleted file mode 100644
index 9288b403..00000000
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.web.runner.internal
-
-import org.opendc.compute.simulator.telemetry.ComputeMonitor
-import org.opendc.compute.simulator.telemetry.table.host.HostTableReader
-import org.opendc.compute.simulator.telemetry.table.service.ServiceData
-import org.opendc.compute.simulator.telemetry.table.service.ServiceTableReader
-import org.opendc.compute.simulator.telemetry.table.service.toServiceData
-import kotlin.math.roundToLong
-
-/**
- * A [ComputeMonitor] that tracks the aggregate metrics for each repeat.
- */
-internal class WebComputeMonitor : ComputeMonitor {
- override fun record(reader: HostTableReader) {
- val slices = reader.downtime / sliceLength
-
- hostAggregateMetrics =
- AggregateHostMetrics(
- hostAggregateMetrics.totalActiveTime + reader.cpuActiveTime,
- hostAggregateMetrics.totalIdleTime + reader.cpuIdleTime,
- hostAggregateMetrics.totalStealTime + reader.cpuStealTime,
- hostAggregateMetrics.totalLostTime + reader.cpuLostTime,
- hostAggregateMetrics.totalPowerDraw + reader.energyUsage,
- hostAggregateMetrics.totalFailureSlices + slices,
- hostAggregateMetrics.totalFailureVmSlices + reader.tasksActive * slices,
- )
-
- hostMetrics.compute(reader.hostInfo.name) { _, prev ->
- HostMetrics(
- reader.cpuUsage + (prev?.cpuUsage ?: 0.0),
- reader.cpuDemand + (prev?.cpuDemand ?: 0.0),
- reader.tasksActive + (prev?.instanceCount ?: 0),
- 1 + (prev?.count ?: 0),
- )
- }
- }
-
- private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
- private val hostMetrics: MutableMap<String, HostMetrics> = mutableMapOf()
- private val sliceLength: Long = 5 * 60L
-
- private data class AggregateHostMetrics(
- val totalActiveTime: Long = 0L,
- val totalIdleTime: Long = 0L,
- val totalStealTime: Long = 0L,
- val totalLostTime: Long = 0L,
- val totalPowerDraw: Double = 0.0,
- val totalFailureSlices: Double = 0.0,
- val totalFailureVmSlices: Double = 0.0,
- )
-
- private data class HostMetrics(
- val cpuUsage: Double,
- val cpuDemand: Double,
- val instanceCount: Long,
- val count: Long,
- )
-
- private lateinit var serviceData: ServiceData
-
- override fun record(reader: ServiceTableReader) {
- serviceData = reader.toServiceData()
- }
-
- /**
- * Collect the results of the simulation.
- */
- fun collectResults(): Results {
- val hostAggregateMetrics = hostAggregateMetrics
- val hostMetrics = hostMetrics
- val serviceData = serviceData
-
- return Results(
- hostAggregateMetrics.totalActiveTime,
- hostAggregateMetrics.totalIdleTime,
- hostAggregateMetrics.totalStealTime,
- hostAggregateMetrics.totalLostTime,
- hostMetrics.map { it.value.cpuUsage / it.value.count }.average().let { if (it.isNaN()) 0.0 else it },
- hostMetrics.map { it.value.cpuDemand / it.value.count }.average().let { if (it.isNaN()) 0.0 else it },
- hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average().let { if (it.isNaN()) 0.0 else it },
- hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0,
- hostAggregateMetrics.totalPowerDraw,
- hostAggregateMetrics.totalFailureSlices.roundToLong(),
- hostAggregateMetrics.totalFailureVmSlices.roundToLong(),
- serviceData.tasksTotal,
- serviceData.tasksPending,
- serviceData.tasksTotal - serviceData.tasksPending - serviceData.tasksActive,
- serviceData.attemptsTerminated,
- )
- }
-
- /**
- * Structure of the results of a single simulation.
- */
- data class Results(
- val totalActiveTime: Long,
- val totalIdleTime: Long,
- val totalStealTime: Long,
- val totalLostTime: Long,
- val meanCpuUsage: Double,
- val meanCpuDemand: Double,
- val meanNumDeployedImages: Double,
- val maxNumDeployedImages: Double,
- val totalPowerDraw: Double,
- val totalFailureSlices: Long,
- val totalFailureVmSlices: Long,
- val totalVmsSubmitted: Int,
- val totalVmsQueued: Int,
- val totalVmsFinished: Int,
- val totalVmsFailed: Int,
- )
-}
diff --git a/opendc-web/opendc-web-runner/src/main/resources/log4j2.xml b/opendc-web/opendc-web-runner/src/main/resources/log4j2.xml
deleted file mode 100644
index ad99cc00..00000000
--- a/opendc-web/opendc-web-runner/src/main/resources/log4j2.xml
+++ /dev/null
@@ -1,48 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ MIT License
- ~
- ~ Copyright (c) 2020 atlarge-research
- ~
- ~ Permission is hereby granted, free of charge, to any person obtaining a copy
- ~ of this software and associated documentation files (the "Software"), to deal
- ~ in the Software without restriction, including without limitation the rights
- ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- ~ copies of the Software, and to permit persons to whom the Software is
- ~ furnished to do so, subject to the following conditions:
- ~
- ~ The above copyright notice and this permission notice shall be included in all
- ~ copies or substantial portions of the Software.
- ~
- ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- ~ SOFTWARE.
- -->
-
-<Configuration status="WARN" packages="org.apache.logging.log4j.core,io.sentry.log4j2">
- <Appenders>
- <Console name="Console" target="SYSTEM_OUT">
- <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/>
- </Console>
-
- <Sentry name="Sentry" />
- </Appenders>
- <Loggers>
- <Logger name="org.opendc" level="warn" additivity="false">
- <AppenderRef ref="Console"/>
- <AppenderRef ref="Sentry"/>
- </Logger>
- <Logger name="org.opendc.web.runner" level="info" additivity="false">
- <AppenderRef ref="Console"/>
- <AppenderRef ref="Sentry"/>
- </Logger>
- <Root level="info">
- <AppenderRef level="error" ref="Console"/>
- <AppenderRef ref="Sentry"/>
- </Root>
- </Loggers>
-</Configuration>