summaryrefslogtreecommitdiff
path: root/opendc-web
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-04 10:04:50 +0200
committerGitHub <noreply@github.com>2022-10-04 10:04:50 +0200
commit92cc0908b7ad6c94b08e6016f8815ab07cd1714d (patch)
treeb5edaff69212986265f9edc620e40bb8695f11eb /opendc-web
parent2d2a3854d355bd4b074ef651f291d34081e70d96 (diff)
parentbd476d11ab24fe745bb54e97a11133706bb96cb1 (diff)
merge: Add provisioning tool for experiments (#104)
This pull request implements a new tool to help provision and manage the experimental environment. ## Implementation Notes :hammer_and_pick: * Add service registry for cloud services * Add provisioning tool for experiments * Add provisioning step for workflow service * Add provisioners for FaaS service * Use experiment base for Capelin experiments * Use experiment base for web runner * Integrate compute workload classes * Remove Topology interface ## Breaking API Changes :warning: * Removal of the `opendc-compute-workload`, `opendc-faas-workload`, and `opendc-workflow-workload` modules. These are now located inside `opendc-experiments` * Removal of `ComputeServiceHelper`. Use `Provisioner` to provision a `ComputeService`.
Diffstat (limited to 'opendc-web')
-rw-r--r--opendc-web/opendc-web-runner/build.gradle.kts3
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt222
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt6
3 files changed, 120 insertions, 111 deletions
diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts
index a5723994..2679a97f 100644
--- a/opendc-web/opendc-web-runner/build.gradle.kts
+++ b/opendc-web/opendc-web-runner/build.gradle.kts
@@ -49,8 +49,7 @@ val cliJar by tasks.creating(Jar::class) {
dependencies {
api(projects.opendcWeb.opendcWebClient)
- implementation(projects.opendcCompute.opendcComputeSimulator)
- implementation(projects.opendcCompute.opendcComputeWorkload)
+ implementation(projects.opendcExperiments.opendcExperimentsCompute)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcTrace.opendcTraceApi)
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index 9a1319b6..74f7c8c1 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -23,11 +23,10 @@
package org.opendc.web.runner
import mu.KotlinLogging
-import org.opendc.compute.workload.*
-import org.opendc.compute.workload.telemetry.ComputeMetricReader
-import org.opendc.compute.workload.topology.HostSpec
-import org.opendc.compute.workload.topology.Topology
-import org.opendc.compute.workload.topology.apply
+import org.opendc.compute.service.ComputeService
+import org.opendc.experiments.compute.*
+import org.opendc.experiments.compute.topology.HostSpec
+import org.opendc.experiments.provisioner.Provisioner
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -37,6 +36,7 @@ import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.web.proto.runner.Job
import org.opendc.web.proto.runner.Scenario
+import org.opendc.web.proto.runner.Topology
import org.opendc.web.runner.internal.WebComputeMonitor
import java.io.File
import java.time.Duration
@@ -74,7 +74,8 @@ public class OpenDCRunner(
/**
* The [ForkJoinPool] that is used to execute the simulation jobs.
*/
- private val pool = ForkJoinPool(parallelism, RunnerThreadFactory(Thread.currentThread().contextClassLoader), null, false)
+ private val pool =
+ ForkJoinPool(parallelism, RunnerThreadFactory(Thread.currentThread().contextClassLoader), null, false)
/**
* A [ScheduledExecutorService] to manage the heartbeat of simulation jobs as well as tracking the deadline of
@@ -129,11 +130,22 @@ public class OpenDCRunner(
val id = job.id
val scenario = job.scenario
- val heartbeat = scheduler.scheduleWithFixedDelay({ manager.heartbeat(id) }, 0, heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS)
+ val heartbeat = scheduler.scheduleWithFixedDelay(
+ { manager.heartbeat(id) },
+ 0,
+ heartbeatInterval.toMillis(),
+ TimeUnit.MILLISECONDS
+ )
try {
val topology = convertTopology(scenario.topology)
- val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology) }
+ val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat ->
+ SimulationTask(
+ scenario,
+ repeat,
+ topology
+ )
+ }
val results = invokeAll(jobs).map { it.rawResult }
logger.info { "Finished simulation for job $id" }
@@ -188,128 +200,126 @@ public class OpenDCRunner(
private inner class SimulationTask(
private val scenario: Scenario,
private val repeat: Int,
- private val topology: Topology,
+ private val topology: List<HostSpec>,
) : RecursiveTask<WebComputeMonitor.Results>() {
override fun compute(): WebComputeMonitor.Results {
val monitor = WebComputeMonitor()
// Schedule task that interrupts the simulation if it runs for too long.
val currentThread = Thread.currentThread()
- val interruptTask = scheduler.schedule({ currentThread.interrupt() }, jobTimeout.toMillis(), TimeUnit.MILLISECONDS)
+ val interruptTask =
+ scheduler.schedule({ currentThread.interrupt() }, jobTimeout.toMillis(), TimeUnit.MILLISECONDS)
try {
- runBlockingSimulation {
- val workloadName = scenario.workload.trace.id
- val workloadFraction = scenario.workload.samplingFraction
-
- val seeder = Random(repeat.toLong())
-
- val phenomena = scenario.phenomena
- val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder)
- val workload = trace(workloadName).sampleByLoad(workloadFraction)
- val vms = workload.resolve(workloadLoader, seeder)
-
- val failureModel =
- if (phenomena.failures)
- grid5000(Duration.ofDays(7))
- else
- null
-
- val simulator = ComputeServiceHelper(
- coroutineContext,
- clock,
- computeScheduler,
- seed = 0L,
- )
- val reader = ComputeMetricReader(this, clock, simulator.service, monitor)
-
- try {
- // Instantiate the topology onto the simulator
- simulator.apply(topology)
- // Run workload trace
- simulator.run(vms, failureModel = failureModel, interference = phenomena.interference)
-
- val serviceMetrics = simulator.service.getSchedulerStats()
- logger.debug {
- "Scheduler " +
- "Success=${serviceMetrics.attemptsSuccess} " +
- "Failure=${serviceMetrics.attemptsFailure} " +
- "Error=${serviceMetrics.attemptsError} " +
- "Pending=${serviceMetrics.serversPending} " +
- "Active=${serviceMetrics.serversActive}"
- }
- } finally {
- simulator.close()
- reader.close()
- }
- }
+ runSimulation(monitor)
} finally {
interruptTask.cancel(false)
}
return monitor.collectResults()
}
+
+ /**
+ * Run a single simulation of the scenario.
+ */
+ private fun runSimulation(monitor: WebComputeMonitor) = runBlockingSimulation {
+ val serviceDomain = "compute.opendc.org"
+ val seed = repeat.toLong()
+
+ val scenario = scenario
+
+ Provisioner(coroutineContext, clock, seed).use { provisioner ->
+ provisioner.runSteps(
+ setupComputeService(
+ serviceDomain,
+ { createComputeScheduler(scenario.schedulerName, Random(it.seeder.nextLong())) }
+ ),
+ registerComputeMonitor(serviceDomain, monitor),
+ setupHosts(serviceDomain, topology)
+ )
+
+ val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!!
+
+ val workload =
+ trace(scenario.workload.trace.id).sampleByLoad(scenario.workload.samplingFraction)
+ val vms = workload.resolve(workloadLoader, Random(seed))
+
+ val phenomena = scenario.phenomena
+ val failureModel =
+ if (phenomena.failures)
+ grid5000(Duration.ofDays(7))
+ else
+ null
+
+ // Run workload trace
+ service.replay(clock, vms, seed, failureModel = failureModel, interference = phenomena.interference)
+
+ val serviceMetrics = service.getSchedulerStats()
+ logger.debug {
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ }
+ }
+ }
}
/**
* Convert the specified [topology] into an [Topology] understood by OpenDC.
*/
- private fun convertTopology(topology: org.opendc.web.proto.runner.Topology): Topology {
- return object : Topology {
-
- override fun resolve(): List<HostSpec> {
- val res = mutableListOf<HostSpec>()
- val random = Random(0)
-
- val machines = topology.rooms.asSequence()
- .flatMap { room ->
- room.tiles.flatMap { tile ->
- val rack = tile.rack
- rack?.machines?.map { machine -> rack to machine } ?: emptyList()
- }
- }
- for ((rack, machine) in machines) {
- val clusterId = rack.id
- val position = machine.position
-
- val processors = machine.cpus.flatMap { cpu ->
- val cores = cpu.numberOfCores
- val speed = cpu.clockRateMhz
- // TODO Remove hard coding of vendor
- val node = ProcessingNode("Intel", "amd64", cpu.name, cores)
- List(cores) { coreId ->
- ProcessingUnit(node, coreId, speed)
- }
- }
- val memoryUnits = machine.memory.map { memory ->
- MemoryUnit(
- "Samsung",
- memory.name,
- memory.speedMbPerS,
- memory.sizeMb.toLong()
- )
- }
-
- val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW }
- val powerModel = LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5)
- val powerDriver = SimplePowerDriver(powerModel)
-
- val spec = HostSpec(
- UUID(random.nextLong(), random.nextLong()),
- "node-$clusterId-$position",
- mapOf("cluster" to clusterId),
- MachineModel(processors, memoryUnits),
- powerDriver
- )
-
- res += spec
+ private fun convertTopology(topology: Topology): List<HostSpec> {
+ val res = mutableListOf<HostSpec>()
+ val random = Random(0)
+
+ val machines = topology.rooms.asSequence()
+ .flatMap { room ->
+ room.tiles.flatMap { tile ->
+ val rack = tile.rack
+ rack?.machines?.map { machine -> rack to machine } ?: emptyList()
}
+ }
- return res
+ for ((rack, machine) in machines) {
+ val clusterId = rack.id
+ val position = machine.position
+
+ val processors = machine.cpus.flatMap { cpu ->
+ val cores = cpu.numberOfCores
+ val speed = cpu.clockRateMhz
+ // TODO Remove hard coding of vendor
+ val node = ProcessingNode("Intel", "amd64", cpu.name, cores)
+ List(cores) { coreId ->
+ ProcessingUnit(node, coreId, speed)
+ }
}
+ val memoryUnits = machine.memory.map { memory ->
+ MemoryUnit(
+ "Samsung",
+ memory.name,
+ memory.speedMbPerS,
+ memory.sizeMb.toLong()
+ )
+ }
+
+ val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW }
+ val powerModel = LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5)
+ val powerDriver = SimplePowerDriver(powerModel)
- override fun toString(): String = "WebRunnerTopologyFactory"
+ val spec = HostSpec(
+ UUID(random.nextLong(), random.nextLong()),
+ "node-$clusterId-$position",
+ mapOf("cluster" to clusterId),
+ MachineModel(processors, memoryUnits),
+ powerDriver
+ )
+
+ res += spec
}
+
+ return res
}
/**
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
index 4c3d1cfa..76377c08 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
@@ -22,9 +22,9 @@
package org.opendc.web.runner.internal
-import org.opendc.compute.workload.telemetry.ComputeMonitor
-import org.opendc.compute.workload.telemetry.table.HostTableReader
-import org.opendc.compute.workload.telemetry.table.ServiceTableReader
+import org.opendc.experiments.compute.telemetry.ComputeMonitor
+import org.opendc.experiments.compute.telemetry.table.HostTableReader
+import org.opendc.experiments.compute.telemetry.table.ServiceTableReader
import kotlin.math.max
import kotlin.math.roundToLong