summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-runner
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-09-30 20:27:43 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-03 20:47:12 +0200
commit07743e75891e8b3ebcefe4771f92af8003ef0b1f (patch)
tree21f42395c56c1378a48a17592d229e516cbd8d78 /opendc-web/opendc-web-runner
parentab0ae4779a674dd07d85ded4a812332d93888bc1 (diff)
refactor(web/runner): Use experiment base for web runner
This change updates the OpenDC web runner to use the new `opendc-experiments-base` module for setting up the experimental environment and simulate the workload.
Diffstat (limited to 'opendc-web/opendc-web-runner')
-rw-r--r--opendc-web/opendc-web-runner/build.gradle.kts1
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt119
2 files changed, 69 insertions, 51 deletions
diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts
index a5723994..6a10267a 100644
--- a/opendc-web/opendc-web-runner/build.gradle.kts
+++ b/opendc-web/opendc-web-runner/build.gradle.kts
@@ -51,6 +51,7 @@ dependencies {
api(projects.opendcWeb.opendcWebClient)
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcCompute.opendcComputeWorkload)
+ implementation(projects.opendcExperiments.opendcExperimentsCompute)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcTrace.opendcTraceApi)
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index 9a1319b6..cf887f54 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -23,11 +23,12 @@
package org.opendc.web.runner
import mu.KotlinLogging
+import org.opendc.compute.service.ComputeService
import org.opendc.compute.workload.*
-import org.opendc.compute.workload.telemetry.ComputeMetricReader
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.compute.workload.topology.Topology
-import org.opendc.compute.workload.topology.apply
+import org.opendc.experiments.compute.*
+import org.opendc.experiments.provisioner.Provisioner
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -74,7 +75,8 @@ public class OpenDCRunner(
/**
* The [ForkJoinPool] that is used to execute the simulation jobs.
*/
- private val pool = ForkJoinPool(parallelism, RunnerThreadFactory(Thread.currentThread().contextClassLoader), null, false)
+ private val pool =
+ ForkJoinPool(parallelism, RunnerThreadFactory(Thread.currentThread().contextClassLoader), null, false)
/**
* A [ScheduledExecutorService] to manage the heartbeat of simulation jobs as well as tracking the deadline of
@@ -129,11 +131,22 @@ public class OpenDCRunner(
val id = job.id
val scenario = job.scenario
- val heartbeat = scheduler.scheduleWithFixedDelay({ manager.heartbeat(id) }, 0, heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS)
+ val heartbeat = scheduler.scheduleWithFixedDelay(
+ { manager.heartbeat(id) },
+ 0,
+ heartbeatInterval.toMillis(),
+ TimeUnit.MILLISECONDS
+ )
try {
val topology = convertTopology(scenario.topology)
- val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology) }
+ val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat ->
+ SimulationTask(
+ scenario,
+ repeat,
+ topology
+ )
+ }
val results = invokeAll(jobs).map { it.rawResult }
logger.info { "Finished simulation for job $id" }
@@ -195,60 +208,64 @@ public class OpenDCRunner(
// Schedule task that interrupts the simulation if it runs for too long.
val currentThread = Thread.currentThread()
- val interruptTask = scheduler.schedule({ currentThread.interrupt() }, jobTimeout.toMillis(), TimeUnit.MILLISECONDS)
+ val interruptTask =
+ scheduler.schedule({ currentThread.interrupt() }, jobTimeout.toMillis(), TimeUnit.MILLISECONDS)
try {
- runBlockingSimulation {
- val workloadName = scenario.workload.trace.id
- val workloadFraction = scenario.workload.samplingFraction
-
- val seeder = Random(repeat.toLong())
-
- val phenomena = scenario.phenomena
- val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder)
- val workload = trace(workloadName).sampleByLoad(workloadFraction)
- val vms = workload.resolve(workloadLoader, seeder)
-
- val failureModel =
- if (phenomena.failures)
- grid5000(Duration.ofDays(7))
- else
- null
-
- val simulator = ComputeServiceHelper(
- coroutineContext,
- clock,
- computeScheduler,
- seed = 0L,
- )
- val reader = ComputeMetricReader(this, clock, simulator.service, monitor)
-
- try {
- // Instantiate the topology onto the simulator
- simulator.apply(topology)
- // Run workload trace
- simulator.run(vms, failureModel = failureModel, interference = phenomena.interference)
-
- val serviceMetrics = simulator.service.getSchedulerStats()
- logger.debug {
- "Scheduler " +
- "Success=${serviceMetrics.attemptsSuccess} " +
- "Failure=${serviceMetrics.attemptsFailure} " +
- "Error=${serviceMetrics.attemptsError} " +
- "Pending=${serviceMetrics.serversPending} " +
- "Active=${serviceMetrics.serversActive}"
- }
- } finally {
- simulator.close()
- reader.close()
- }
- }
+ runSimulation(monitor)
} finally {
interruptTask.cancel(false)
}
return monitor.collectResults()
}
+
+ /**
+ * Run a single simulation of the scenario.
+ */
+ private fun runSimulation(monitor: WebComputeMonitor) = runBlockingSimulation {
+ val serviceDomain = "compute.opendc.org"
+ val seed = repeat.toLong()
+
+ val scenario = scenario
+
+ Provisioner(coroutineContext, clock, seed).use { provisioner ->
+ provisioner.runSteps(
+ setupComputeService(
+ serviceDomain,
+ { createComputeScheduler(scenario.schedulerName, Random(it.seeder.nextLong())) }
+ ),
+ registerComputeMonitor(serviceDomain, monitor),
+ setupHosts(serviceDomain, topology.resolve())
+ )
+
+ val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!!
+
+ val workload =
+ trace(scenario.workload.trace.id).sampleByLoad(scenario.workload.samplingFraction)
+ val vms = workload.resolve(workloadLoader, Random(seed))
+
+ val phenomena = scenario.phenomena
+ val failureModel =
+ if (phenomena.failures)
+ grid5000(Duration.ofDays(7))
+ else
+ null
+
+ // Run workload trace
+ service.replay(clock, vms, seed, failureModel = failureModel, interference = phenomena.interference)
+
+ val serviceMetrics = service.getSchedulerStats()
+ logger.debug {
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ }
+ }
+ }
}
/**