diff options
Diffstat (limited to 'opendc-web/opendc-web-runner/src')
| -rw-r--r-- | opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt | 119 |
1 files changed, 68 insertions, 51 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 9a1319b6..cf887f54 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -23,11 +23,12 @@ package org.opendc.web.runner import mu.KotlinLogging +import org.opendc.compute.service.ComputeService import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.ComputeMetricReader import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology -import org.opendc.compute.workload.topology.apply +import org.opendc.experiments.compute.* +import org.opendc.experiments.provisioner.Provisioner import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -74,7 +75,8 @@ public class OpenDCRunner( /** * The [ForkJoinPool] that is used to execute the simulation jobs. */ - private val pool = ForkJoinPool(parallelism, RunnerThreadFactory(Thread.currentThread().contextClassLoader), null, false) + private val pool = + ForkJoinPool(parallelism, RunnerThreadFactory(Thread.currentThread().contextClassLoader), null, false) /** * A [ScheduledExecutorService] to manage the heartbeat of simulation jobs as well as tracking the deadline of @@ -129,11 +131,22 @@ public class OpenDCRunner( val id = job.id val scenario = job.scenario - val heartbeat = scheduler.scheduleWithFixedDelay({ manager.heartbeat(id) }, 0, heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS) + val heartbeat = scheduler.scheduleWithFixedDelay( + { manager.heartbeat(id) }, + 0, + heartbeatInterval.toMillis(), + TimeUnit.MILLISECONDS + ) try { val topology = convertTopology(scenario.topology) - val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology) } + val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> + SimulationTask( + scenario, + repeat, + topology + ) + } val results = invokeAll(jobs).map { it.rawResult } logger.info { "Finished simulation for job $id" } @@ -195,60 +208,64 @@ public class OpenDCRunner( // Schedule task that interrupts the simulation if it runs for too long. val currentThread = Thread.currentThread() - val interruptTask = scheduler.schedule({ currentThread.interrupt() }, jobTimeout.toMillis(), TimeUnit.MILLISECONDS) + val interruptTask = + scheduler.schedule({ currentThread.interrupt() }, jobTimeout.toMillis(), TimeUnit.MILLISECONDS) try { - runBlockingSimulation { - val workloadName = scenario.workload.trace.id - val workloadFraction = scenario.workload.samplingFraction - - val seeder = Random(repeat.toLong()) - - val phenomena = scenario.phenomena - val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder) - val workload = trace(workloadName).sampleByLoad(workloadFraction) - val vms = workload.resolve(workloadLoader, seeder) - - val failureModel = - if (phenomena.failures) - grid5000(Duration.ofDays(7)) - else - null - - val simulator = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed = 0L, - ) - val reader = ComputeMetricReader(this, clock, simulator.service, monitor) - - try { - // Instantiate the topology onto the simulator - simulator.apply(topology) - // Run workload trace - simulator.run(vms, failureModel = failureModel, interference = phenomena.interference) - - val serviceMetrics = simulator.service.getSchedulerStats() - logger.debug { - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - } - } finally { - simulator.close() - reader.close() - } - } + runSimulation(monitor) } finally { interruptTask.cancel(false) } return monitor.collectResults() } + + /** + * Run a single simulation of the scenario. + */ + private fun runSimulation(monitor: WebComputeMonitor) = runBlockingSimulation { + val serviceDomain = "compute.opendc.org" + val seed = repeat.toLong() + + val scenario = scenario + + Provisioner(coroutineContext, clock, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService( + serviceDomain, + { createComputeScheduler(scenario.schedulerName, Random(it.seeder.nextLong())) } + ), + registerComputeMonitor(serviceDomain, monitor), + setupHosts(serviceDomain, topology.resolve()) + ) + + val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! + + val workload = + trace(scenario.workload.trace.id).sampleByLoad(scenario.workload.samplingFraction) + val vms = workload.resolve(workloadLoader, Random(seed)) + + val phenomena = scenario.phenomena + val failureModel = + if (phenomena.failures) + grid5000(Duration.ofDays(7)) + else + null + + // Run workload trace + service.replay(clock, vms, seed, failureModel = failureModel, interference = phenomena.interference) + + val serviceMetrics = service.getSchedulerStats() + logger.debug { + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" + } + } + } } /** |
