From 5864cbcbfe2eb8c36ca05c3a39c7e5916aeecaec Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 5 Mar 2024 13:23:57 +0100 Subject: Updated package versions, updated web server tests. (#207) * Updated all package versions including kotlin. Updated all web-server tests to run. * Changed the java version of the tests. OpenDC now only supports java 19. * small update * test update * new update * updated docker version to 19 * updated docker version to 19 --- .../src/cli/kotlin/org/opendc/web/runner/Main.kt | 14 +- .../kotlin/org/opendc/web/runner/JobManager.kt | 16 +- .../kotlin/org/opendc/web/runner/OpenDCRunner.kt | 196 +++++++++++---------- .../opendc/web/runner/internal/JobManagerImpl.kt | 16 +- .../web/runner/internal/WebComputeMonitor.kt | 33 ++-- 5 files changed, 152 insertions(+), 123 deletions(-) (limited to 'opendc-web/opendc-web-runner/src') diff --git a/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt index 299c4d09..5d35fd98 100644 --- a/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt @@ -48,7 +48,7 @@ class RunnerCli : CliktCommand(name = "opendc-runner") { private val apiUrl by option( "--api-url", help = "url to the OpenDC API", - envvar = "OPENDC_API_URL" + envvar = "OPENDC_API_URL", ) .convert { URI(it) } .default(URI("https://api.opendc.org/v2")) @@ -59,7 +59,7 @@ class RunnerCli : CliktCommand(name = "opendc-runner") { private val authDomain by option( "--auth-domain", help = "auth domain of the OpenDC API", - envvar = "AUTH0_DOMAIN" + envvar = "AUTH0_DOMAIN", ) .required() @@ -69,7 +69,7 @@ class RunnerCli : CliktCommand(name = "opendc-runner") { private val authAudience by option( "--auth-audience", help = "auth audience of the OpenDC API", - envvar = "AUTH0_AUDIENCE" + envvar = "AUTH0_AUDIENCE", ) .required() @@ -79,7 +79,7 @@ class RunnerCli : CliktCommand(name = "opendc-runner") { private val authClientId by option( "--auth-id", help = "auth client id of the OpenDC API", - envvar = "AUTH0_CLIENT_ID" + envvar = "AUTH0_CLIENT_ID", ) .required() @@ -89,7 +89,7 @@ class RunnerCli : CliktCommand(name = "opendc-runner") { private val authClientSecret by option( "--auth-secret", help = "auth client secret of the OpenDC API", - envvar = "AUTH0_CLIENT_SECRET" + envvar = "AUTH0_CLIENT_SECRET", ) .required() @@ -99,7 +99,7 @@ class RunnerCli : CliktCommand(name = "opendc-runner") { private val tracePath by option( "--traces", help = "path to the directory containing the traces", - envvar = "OPENDC_TRACES" + envvar = "OPENDC_TRACES", ) .file(canBeFile = false) .defaultLazy { File("traces/") } @@ -109,7 +109,7 @@ class RunnerCli : CliktCommand(name = "opendc-runner") { */ private val parallelism by option( "--parallelism", - help = "maximum number of threads for simulations" + help = "maximum number of threads for simulations", ) .int() .default(Runtime.getRuntime().availableProcessors() - 1) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt index d6c06889..a517f3b4 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt @@ -47,17 +47,27 @@ public interface JobManager { * @param runtime The total runtime of the job. * @return `true` if the job can continue, `false` if the job has been cancelled. */ - public fun heartbeat(id: Long, runtime: Int): Boolean + public fun heartbeat( + id: Long, + runtime: Int, + ): Boolean /** * Mark the job as failed. */ - public fun fail(id: Long, runtime: Int) + public fun fail( + id: Long, + runtime: Int, + ) /** * Persist the specified results for the specified job. */ - public fun finish(id: Long, runtime: Int, results: Map) + public fun finish( + id: Long, + runtime: Int, + results: Map, + ) public companion object { /** diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 4351f3c1..eee340cf 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -76,7 +76,7 @@ public class OpenDCRunner( parallelism: Int = Runtime.getRuntime().availableProcessors(), private val jobTimeout: Duration = Duration.ofMinutes(10), private val pollInterval: Duration = Duration.ofSeconds(30), - private val heartbeatInterval: Duration = Duration.ofMinutes(1) + private val heartbeatInterval: Duration = Duration.ofMinutes(1), ) : Runnable { /** * Logging instance for this runner. @@ -149,26 +149,28 @@ public class OpenDCRunner( val startTime = Instant.now() val currentThread = Thread.currentThread() - val heartbeat = scheduler.scheduleWithFixedDelay( - { - if (!manager.heartbeat(id, startTime.secondsSince())) { - currentThread.interrupt() - } - }, - 0, - heartbeatInterval.toMillis(), - TimeUnit.MILLISECONDS - ) + val heartbeat = + scheduler.scheduleWithFixedDelay( + { + if (!manager.heartbeat(id, startTime.secondsSince())) { + currentThread.interrupt() + } + }, + 0, + heartbeatInterval.toMillis(), + TimeUnit.MILLISECONDS, + ) try { val topology = convertTopology(scenario.topology) - val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> - SimulationTask( - scenario, - repeat, - topology - ) - } + val jobs = + (0 until scenario.portfolio.targets.repeats).map { repeat -> + SimulationTask( + scenario, + repeat, + topology, + ) + } val results = invokeAll(jobs).map { it.rawResult } heartbeat.cancel(true) @@ -194,8 +196,8 @@ public class OpenDCRunner( "total_vms_submitted" to results.map { it.totalVmsSubmitted }, "total_vms_queued" to results.map { it.totalVmsQueued }, "total_vms_finished" to results.map { it.totalVmsFinished }, - "total_vms_failed" to results.map { it.totalVmsFailed } - ) + "total_vms_failed" to results.map { it.totalVmsFailed }, + ), ) } catch (e: Exception) { // Check whether the job failed due to exceeding its time budget @@ -232,7 +234,7 @@ public class OpenDCRunner( private inner class SimulationTask( private val scenario: Scenario, private val repeat: Int, - private val topology: List + private val topology: List, ) : RecursiveTask() { override fun compute(): WebComputeMonitor.Results { val monitor = WebComputeMonitor() @@ -254,50 +256,51 @@ public class OpenDCRunner( /** * Run a single simulation of the scenario. */ - private fun runSimulation(monitor: WebComputeMonitor) = runSimulation { - val serviceDomain = "compute.opendc.org" - val seed = repeat.toLong() - - val scenario = scenario - - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService( - serviceDomain, - { createComputeScheduler(scenario.schedulerName, Random(it.seeder.nextLong())) } - ), - registerComputeMonitor(serviceDomain, monitor), - setupHosts(serviceDomain, topology) - ) - - val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! - - val workload = - trace(scenario.workload.trace.id).sampleByLoad(scenario.workload.samplingFraction) - val vms = workload.resolve(workloadLoader, Random(seed)) + private fun runSimulation(monitor: WebComputeMonitor) = + runSimulation { + val serviceDomain = "compute.opendc.org" + val seed = repeat.toLong() + + val scenario = scenario + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService( + serviceDomain, + { createComputeScheduler(scenario.schedulerName, Random(it.seeder.nextLong())) }, + ), + registerComputeMonitor(serviceDomain, monitor), + setupHosts(serviceDomain, topology), + ) - val phenomena = scenario.phenomena - val failureModel = - if (phenomena.failures) { - grid5000(Duration.ofDays(7)) - } else { - null + val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! + + val workload = + trace(scenario.workload.trace.id).sampleByLoad(scenario.workload.samplingFraction) + val vms = workload.resolve(workloadLoader, Random(seed)) + + val phenomena = scenario.phenomena + val failureModel = + if (phenomena.failures) { + grid5000(Duration.ofDays(7)) + } else { + null + } + + // Run workload trace + service.replay(timeSource, vms, seed, failureModel = failureModel, interference = phenomena.interference) + + val serviceMetrics = service.getSchedulerStats() + logger.debug { + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" } - - // Run workload trace - service.replay(timeSource, vms, seed, failureModel = failureModel, interference = phenomena.interference) - - val serviceMetrics = service.getSchedulerStats() - logger.debug { - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" } } - } } /** @@ -307,46 +310,50 @@ public class OpenDCRunner( val res = mutableListOf() val random = Random(0) - val machines = topology.rooms.asSequence() - .flatMap { room -> - room.tiles.flatMap { tile -> - val rack = tile.rack - rack?.machines?.map { machine -> rack to machine } ?: emptyList() + val machines = + topology.rooms.asSequence() + .flatMap { room -> + room.tiles.flatMap { tile -> + val rack = tile.rack + rack?.machines?.map { machine -> rack to machine } ?: emptyList() + } } - } for ((rack, machine) in machines) { val clusterId = rack.id val position = machine.position - val processors = machine.cpus.flatMap { cpu -> - val cores = cpu.numberOfCores - val speed = cpu.clockRateMhz - // TODO Remove hard coding of vendor - val node = ProcessingNode("Intel", "amd64", cpu.name, cores) - List(cores) { coreId -> - ProcessingUnit(node, coreId, speed) + val processors = + machine.cpus.flatMap { cpu -> + val cores = cpu.numberOfCores + val speed = cpu.clockRateMhz + // TODO Remove hard coding of vendor + val node = ProcessingNode("Intel", "amd64", cpu.name, cores) + List(cores) { coreId -> + ProcessingUnit(node, coreId, speed) + } + } + val memoryUnits = + machine.memory.map { memory -> + MemoryUnit( + "Samsung", + memory.name, + memory.speedMbPerS, + memory.sizeMb.toLong(), + ) } - } - val memoryUnits = machine.memory.map { memory -> - MemoryUnit( - "Samsung", - memory.name, - memory.speedMbPerS, - memory.sizeMb.toLong() - ) - } val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW } val powerModel = CpuPowerModels.linear(2 * energyConsumptionW, energyConsumptionW * 0.5) - val spec = HostSpec( - UUID(random.nextLong(), random.nextLong()), - "node-$clusterId-$position", - mapOf("cluster" to clusterId), - MachineModel(processors, memoryUnits), - SimPsuFactories.simple(powerModel) - ) + val spec = + HostSpec( + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$position", + mapOf("cluster" to clusterId), + MachineModel(processors, memoryUnits), + SimPsuFactories.simple(powerModel), + ) res += spec } @@ -358,10 +365,11 @@ public class OpenDCRunner( * A custom [ForkJoinWorkerThreadFactory] that uses the [ClassLoader] of specified by the runner. */ private class RunnerThreadFactory(private val classLoader: ClassLoader) : ForkJoinWorkerThreadFactory { - override fun newThread(pool: ForkJoinPool): ForkJoinWorkerThread = object : ForkJoinWorkerThread(pool) { - init { - contextClassLoader = classLoader + override fun newThread(pool: ForkJoinPool): ForkJoinWorkerThread = + object : ForkJoinWorkerThread(pool) { + init { + contextClassLoader = classLoader + } } - } } } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt index 5b1b7132..7081041c 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt @@ -44,16 +44,26 @@ internal class JobManagerImpl(private val client: OpenDCRunnerClient) : JobManag } } - override fun heartbeat(id: Long, runtime: Int): Boolean { + override fun heartbeat( + id: Long, + runtime: Int, + ): Boolean { val res = client.jobs.update(id, Job.Update(JobState.RUNNING, runtime)) return res?.state != JobState.FAILED } - override fun fail(id: Long, runtime: Int) { + override fun fail( + id: Long, + runtime: Int, + ) { client.jobs.update(id, Job.Update(JobState.FAILED, runtime)) } - override fun finish(id: Long, runtime: Int, results: Map) { + override fun finish( + id: Long, + runtime: Int, + results: Map, + ) { client.jobs.update(id, Job.Update(JobState.FINISHED, runtime)) } } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt index 774689c9..4576a463 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt @@ -34,31 +34,32 @@ import kotlin.math.roundToLong */ internal class WebComputeMonitor : ComputeMonitor { override fun record(reader: HostTableReader) { - val slices = reader.downtime / SLICE_LENGTH + val slices = reader.downtime / sliceLength - hostAggregateMetrics = AggregateHostMetrics( - hostAggregateMetrics.totalActiveTime + reader.cpuActiveTime, - hostAggregateMetrics.totalIdleTime + reader.cpuIdleTime, - hostAggregateMetrics.totalStealTime + reader.cpuStealTime, - hostAggregateMetrics.totalLostTime + reader.cpuLostTime, - hostAggregateMetrics.totalPowerDraw + reader.energyUsage, - hostAggregateMetrics.totalFailureSlices + slices, - hostAggregateMetrics.totalFailureVmSlices + reader.guestsRunning * slices - ) + hostAggregateMetrics = + AggregateHostMetrics( + hostAggregateMetrics.totalActiveTime + reader.cpuActiveTime, + hostAggregateMetrics.totalIdleTime + reader.cpuIdleTime, + hostAggregateMetrics.totalStealTime + reader.cpuStealTime, + hostAggregateMetrics.totalLostTime + reader.cpuLostTime, + hostAggregateMetrics.totalPowerDraw + reader.energyUsage, + hostAggregateMetrics.totalFailureSlices + slices, + hostAggregateMetrics.totalFailureVmSlices + reader.guestsRunning * slices, + ) hostMetrics.compute(reader.host.id) { _, prev -> HostMetrics( reader.cpuUsage + (prev?.cpuUsage ?: 0.0), reader.cpuDemand + (prev?.cpuDemand ?: 0.0), reader.guestsRunning + (prev?.instanceCount ?: 0), - 1 + (prev?.count ?: 0) + 1 + (prev?.count ?: 0), ) } } private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() private val hostMetrics: MutableMap = mutableMapOf() - private val SLICE_LENGTH: Long = 5 * 60L + private val sliceLength: Long = 5 * 60L private data class AggregateHostMetrics( val totalActiveTime: Long = 0L, @@ -67,14 +68,14 @@ internal class WebComputeMonitor : ComputeMonitor { val totalLostTime: Long = 0L, val totalPowerDraw: Double = 0.0, val totalFailureSlices: Double = 0.0, - val totalFailureVmSlices: Double = 0.0 + val totalFailureVmSlices: Double = 0.0, ) private data class HostMetrics( val cpuUsage: Double, val cpuDemand: Double, val instanceCount: Long, - val count: Long + val count: Long, ) private lateinit var serviceData: ServiceData @@ -106,7 +107,7 @@ internal class WebComputeMonitor : ComputeMonitor { serviceData.serversTotal, serviceData.serversPending, serviceData.serversTotal - serviceData.serversPending - serviceData.serversActive, - serviceData.attemptsError + serviceData.attemptsFailure + serviceData.attemptsError + serviceData.attemptsFailure, ) } @@ -128,6 +129,6 @@ internal class WebComputeMonitor : ComputeMonitor { val totalVmsSubmitted: Int, val totalVmsQueued: Int, val totalVmsFinished: Int, - val totalVmsFailed: Int + val totalVmsFailed: Int, ) } -- cgit v1.2.3