diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-04 10:04:50 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-10-04 10:04:50 +0200 |
| commit | 92cc0908b7ad6c94b08e6016f8815ab07cd1714d (patch) | |
| tree | b5edaff69212986265f9edc620e40bb8695f11eb /opendc-web | |
| parent | 2d2a3854d355bd4b074ef651f291d34081e70d96 (diff) | |
| parent | bd476d11ab24fe745bb54e97a11133706bb96cb1 (diff) | |
merge: Add provisioning tool for experiments (#104)
This pull request implements a new tool to help provision and manage the
experimental environment.
## Implementation Notes :hammer_and_pick:
* Add service registry for cloud services
* Add provisioning tool for experiments
* Add provisioning step for workflow service
* Add provisioners for FaaS service
* Use experiment base for Capelin experiments
* Use experiment base for web runner
* Integrate compute workload classes
* Remove Topology interface
## Breaking API Changes :warning:
* Removal of the `opendc-compute-workload`, `opendc-faas-workload`,
and `opendc-workflow-workload` modules. These are now located
inside `opendc-experiments`
* Removal of `ComputeServiceHelper`. Use `Provisioner` to provision
a `ComputeService`.
Diffstat (limited to 'opendc-web')
3 files changed, 120 insertions, 111 deletions
diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts index a5723994..2679a97f 100644 --- a/opendc-web/opendc-web-runner/build.gradle.kts +++ b/opendc-web/opendc-web-runner/build.gradle.kts @@ -49,8 +49,7 @@ val cliJar by tasks.creating(Jar::class) { dependencies { api(projects.opendcWeb.opendcWebClient) - implementation(projects.opendcCompute.opendcComputeSimulator) - implementation(projects.opendcCompute.opendcComputeWorkload) + implementation(projects.opendcExperiments.opendcExperimentsCompute) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcTrace.opendcTraceApi) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 9a1319b6..74f7c8c1 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -23,11 +23,10 @@ package org.opendc.web.runner import mu.KotlinLogging -import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.ComputeMetricReader -import org.opendc.compute.workload.topology.HostSpec -import org.opendc.compute.workload.topology.Topology -import org.opendc.compute.workload.topology.apply +import org.opendc.compute.service.ComputeService +import org.opendc.experiments.compute.* +import org.opendc.experiments.compute.topology.HostSpec +import org.opendc.experiments.provisioner.Provisioner import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -37,6 +36,7 @@ import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation import org.opendc.web.proto.runner.Job import org.opendc.web.proto.runner.Scenario +import org.opendc.web.proto.runner.Topology import org.opendc.web.runner.internal.WebComputeMonitor import java.io.File import java.time.Duration @@ -74,7 +74,8 @@ public class OpenDCRunner( /** * The [ForkJoinPool] that is used to execute the simulation jobs. */ - private val pool = ForkJoinPool(parallelism, RunnerThreadFactory(Thread.currentThread().contextClassLoader), null, false) + private val pool = + ForkJoinPool(parallelism, RunnerThreadFactory(Thread.currentThread().contextClassLoader), null, false) /** * A [ScheduledExecutorService] to manage the heartbeat of simulation jobs as well as tracking the deadline of @@ -129,11 +130,22 @@ public class OpenDCRunner( val id = job.id val scenario = job.scenario - val heartbeat = scheduler.scheduleWithFixedDelay({ manager.heartbeat(id) }, 0, heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS) + val heartbeat = scheduler.scheduleWithFixedDelay( + { manager.heartbeat(id) }, + 0, + heartbeatInterval.toMillis(), + TimeUnit.MILLISECONDS + ) try { val topology = convertTopology(scenario.topology) - val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology) } + val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> + SimulationTask( + scenario, + repeat, + topology + ) + } val results = invokeAll(jobs).map { it.rawResult } logger.info { "Finished simulation for job $id" } @@ -188,128 +200,126 @@ public class OpenDCRunner( private inner class SimulationTask( private val scenario: Scenario, private val repeat: Int, - private val topology: Topology, + private val topology: List<HostSpec>, ) : RecursiveTask<WebComputeMonitor.Results>() { override fun compute(): WebComputeMonitor.Results { val monitor = WebComputeMonitor() // Schedule task that interrupts the simulation if it runs for too long. val currentThread = Thread.currentThread() - val interruptTask = scheduler.schedule({ currentThread.interrupt() }, jobTimeout.toMillis(), TimeUnit.MILLISECONDS) + val interruptTask = + scheduler.schedule({ currentThread.interrupt() }, jobTimeout.toMillis(), TimeUnit.MILLISECONDS) try { - runBlockingSimulation { - val workloadName = scenario.workload.trace.id - val workloadFraction = scenario.workload.samplingFraction - - val seeder = Random(repeat.toLong()) - - val phenomena = scenario.phenomena - val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder) - val workload = trace(workloadName).sampleByLoad(workloadFraction) - val vms = workload.resolve(workloadLoader, seeder) - - val failureModel = - if (phenomena.failures) - grid5000(Duration.ofDays(7)) - else - null - - val simulator = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed = 0L, - ) - val reader = ComputeMetricReader(this, clock, simulator.service, monitor) - - try { - // Instantiate the topology onto the simulator - simulator.apply(topology) - // Run workload trace - simulator.run(vms, failureModel = failureModel, interference = phenomena.interference) - - val serviceMetrics = simulator.service.getSchedulerStats() - logger.debug { - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - } - } finally { - simulator.close() - reader.close() - } - } + runSimulation(monitor) } finally { interruptTask.cancel(false) } return monitor.collectResults() } + + /** + * Run a single simulation of the scenario. + */ + private fun runSimulation(monitor: WebComputeMonitor) = runBlockingSimulation { + val serviceDomain = "compute.opendc.org" + val seed = repeat.toLong() + + val scenario = scenario + + Provisioner(coroutineContext, clock, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService( + serviceDomain, + { createComputeScheduler(scenario.schedulerName, Random(it.seeder.nextLong())) } + ), + registerComputeMonitor(serviceDomain, monitor), + setupHosts(serviceDomain, topology) + ) + + val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! + + val workload = + trace(scenario.workload.trace.id).sampleByLoad(scenario.workload.samplingFraction) + val vms = workload.resolve(workloadLoader, Random(seed)) + + val phenomena = scenario.phenomena + val failureModel = + if (phenomena.failures) + grid5000(Duration.ofDays(7)) + else + null + + // Run workload trace + service.replay(clock, vms, seed, failureModel = failureModel, interference = phenomena.interference) + + val serviceMetrics = service.getSchedulerStats() + logger.debug { + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" + } + } + } } /** * Convert the specified [topology] into an [Topology] understood by OpenDC. */ - private fun convertTopology(topology: org.opendc.web.proto.runner.Topology): Topology { - return object : Topology { - - override fun resolve(): List<HostSpec> { - val res = mutableListOf<HostSpec>() - val random = Random(0) - - val machines = topology.rooms.asSequence() - .flatMap { room -> - room.tiles.flatMap { tile -> - val rack = tile.rack - rack?.machines?.map { machine -> rack to machine } ?: emptyList() - } - } - for ((rack, machine) in machines) { - val clusterId = rack.id - val position = machine.position - - val processors = machine.cpus.flatMap { cpu -> - val cores = cpu.numberOfCores - val speed = cpu.clockRateMhz - // TODO Remove hard coding of vendor - val node = ProcessingNode("Intel", "amd64", cpu.name, cores) - List(cores) { coreId -> - ProcessingUnit(node, coreId, speed) - } - } - val memoryUnits = machine.memory.map { memory -> - MemoryUnit( - "Samsung", - memory.name, - memory.speedMbPerS, - memory.sizeMb.toLong() - ) - } - - val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW } - val powerModel = LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5) - val powerDriver = SimplePowerDriver(powerModel) - - val spec = HostSpec( - UUID(random.nextLong(), random.nextLong()), - "node-$clusterId-$position", - mapOf("cluster" to clusterId), - MachineModel(processors, memoryUnits), - powerDriver - ) - - res += spec + private fun convertTopology(topology: Topology): List<HostSpec> { + val res = mutableListOf<HostSpec>() + val random = Random(0) + + val machines = topology.rooms.asSequence() + .flatMap { room -> + room.tiles.flatMap { tile -> + val rack = tile.rack + rack?.machines?.map { machine -> rack to machine } ?: emptyList() } + } - return res + for ((rack, machine) in machines) { + val clusterId = rack.id + val position = machine.position + + val processors = machine.cpus.flatMap { cpu -> + val cores = cpu.numberOfCores + val speed = cpu.clockRateMhz + // TODO Remove hard coding of vendor + val node = ProcessingNode("Intel", "amd64", cpu.name, cores) + List(cores) { coreId -> + ProcessingUnit(node, coreId, speed) + } } + val memoryUnits = machine.memory.map { memory -> + MemoryUnit( + "Samsung", + memory.name, + memory.speedMbPerS, + memory.sizeMb.toLong() + ) + } + + val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW } + val powerModel = LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5) + val powerDriver = SimplePowerDriver(powerModel) - override fun toString(): String = "WebRunnerTopologyFactory" + val spec = HostSpec( + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$position", + mapOf("cluster" to clusterId), + MachineModel(processors, memoryUnits), + powerDriver + ) + + res += spec } + + return res } /** diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt index 4c3d1cfa..76377c08 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt @@ -22,9 +22,9 @@ package org.opendc.web.runner.internal -import org.opendc.compute.workload.telemetry.ComputeMonitor -import org.opendc.compute.workload.telemetry.table.HostTableReader -import org.opendc.compute.workload.telemetry.table.ServiceTableReader +import org.opendc.experiments.compute.telemetry.ComputeMonitor +import org.opendc.experiments.compute.telemetry.table.HostTableReader +import org.opendc.experiments.compute.telemetry.table.ServiceTableReader import kotlin.math.max import kotlin.math.roundToLong |
