summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-runner/src/main
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-03-05 13:23:57 +0100
committerGitHub <noreply@github.com>2024-03-05 13:23:57 +0100
commit5864cbcbfe2eb8c36ca05c3a39c7e5916aeecaec (patch)
tree5b2773b8dc21c2e1b526fb70f829c376dd80532a /opendc-web/opendc-web-runner/src/main
parentd28002a3c151d198298574312f32f1cb43f3a660 (diff)
Updated package versions, updated web server tests. (#207)
* Updated all package versions including kotlin. Updated all web-server tests to run. * Changed the java version of the tests. OpenDC now only supports java 19. * small update * test update * new update * updated docker version to 19 * updated docker version to 19
Diffstat (limited to 'opendc-web/opendc-web-runner/src/main')
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt16
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt196
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt16
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt33
4 files changed, 145 insertions, 116 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt
index d6c06889..a517f3b4 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt
@@ -47,17 +47,27 @@ public interface JobManager {
* @param runtime The total runtime of the job.
* @return `true` if the job can continue, `false` if the job has been cancelled.
*/
- public fun heartbeat(id: Long, runtime: Int): Boolean
+ public fun heartbeat(
+ id: Long,
+ runtime: Int,
+ ): Boolean
/**
* Mark the job as failed.
*/
- public fun fail(id: Long, runtime: Int)
+ public fun fail(
+ id: Long,
+ runtime: Int,
+ )
/**
* Persist the specified results for the specified job.
*/
- public fun finish(id: Long, runtime: Int, results: Map<String, Any>)
+ public fun finish(
+ id: Long,
+ runtime: Int,
+ results: Map<String, Any>,
+ )
public companion object {
/**
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index 4351f3c1..eee340cf 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -76,7 +76,7 @@ public class OpenDCRunner(
parallelism: Int = Runtime.getRuntime().availableProcessors(),
private val jobTimeout: Duration = Duration.ofMinutes(10),
private val pollInterval: Duration = Duration.ofSeconds(30),
- private val heartbeatInterval: Duration = Duration.ofMinutes(1)
+ private val heartbeatInterval: Duration = Duration.ofMinutes(1),
) : Runnable {
/**
* Logging instance for this runner.
@@ -149,26 +149,28 @@ public class OpenDCRunner(
val startTime = Instant.now()
val currentThread = Thread.currentThread()
- val heartbeat = scheduler.scheduleWithFixedDelay(
- {
- if (!manager.heartbeat(id, startTime.secondsSince())) {
- currentThread.interrupt()
- }
- },
- 0,
- heartbeatInterval.toMillis(),
- TimeUnit.MILLISECONDS
- )
+ val heartbeat =
+ scheduler.scheduleWithFixedDelay(
+ {
+ if (!manager.heartbeat(id, startTime.secondsSince())) {
+ currentThread.interrupt()
+ }
+ },
+ 0,
+ heartbeatInterval.toMillis(),
+ TimeUnit.MILLISECONDS,
+ )
try {
val topology = convertTopology(scenario.topology)
- val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat ->
- SimulationTask(
- scenario,
- repeat,
- topology
- )
- }
+ val jobs =
+ (0 until scenario.portfolio.targets.repeats).map { repeat ->
+ SimulationTask(
+ scenario,
+ repeat,
+ topology,
+ )
+ }
val results = invokeAll(jobs).map { it.rawResult }
heartbeat.cancel(true)
@@ -194,8 +196,8 @@ public class OpenDCRunner(
"total_vms_submitted" to results.map { it.totalVmsSubmitted },
"total_vms_queued" to results.map { it.totalVmsQueued },
"total_vms_finished" to results.map { it.totalVmsFinished },
- "total_vms_failed" to results.map { it.totalVmsFailed }
- )
+ "total_vms_failed" to results.map { it.totalVmsFailed },
+ ),
)
} catch (e: Exception) {
// Check whether the job failed due to exceeding its time budget
@@ -232,7 +234,7 @@ public class OpenDCRunner(
private inner class SimulationTask(
private val scenario: Scenario,
private val repeat: Int,
- private val topology: List<HostSpec>
+ private val topology: List<HostSpec>,
) : RecursiveTask<WebComputeMonitor.Results>() {
override fun compute(): WebComputeMonitor.Results {
val monitor = WebComputeMonitor()
@@ -254,50 +256,51 @@ public class OpenDCRunner(
/**
* Run a single simulation of the scenario.
*/
- private fun runSimulation(monitor: WebComputeMonitor) = runSimulation {
- val serviceDomain = "compute.opendc.org"
- val seed = repeat.toLong()
-
- val scenario = scenario
-
- Provisioner(dispatcher, seed).use { provisioner ->
- provisioner.runSteps(
- setupComputeService(
- serviceDomain,
- { createComputeScheduler(scenario.schedulerName, Random(it.seeder.nextLong())) }
- ),
- registerComputeMonitor(serviceDomain, monitor),
- setupHosts(serviceDomain, topology)
- )
-
- val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!!
-
- val workload =
- trace(scenario.workload.trace.id).sampleByLoad(scenario.workload.samplingFraction)
- val vms = workload.resolve(workloadLoader, Random(seed))
+ private fun runSimulation(monitor: WebComputeMonitor) =
+ runSimulation {
+ val serviceDomain = "compute.opendc.org"
+ val seed = repeat.toLong()
+
+ val scenario = scenario
+
+ Provisioner(dispatcher, seed).use { provisioner ->
+ provisioner.runSteps(
+ setupComputeService(
+ serviceDomain,
+ { createComputeScheduler(scenario.schedulerName, Random(it.seeder.nextLong())) },
+ ),
+ registerComputeMonitor(serviceDomain, monitor),
+ setupHosts(serviceDomain, topology),
+ )
- val phenomena = scenario.phenomena
- val failureModel =
- if (phenomena.failures) {
- grid5000(Duration.ofDays(7))
- } else {
- null
+ val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!!
+
+ val workload =
+ trace(scenario.workload.trace.id).sampleByLoad(scenario.workload.samplingFraction)
+ val vms = workload.resolve(workloadLoader, Random(seed))
+
+ val phenomena = scenario.phenomena
+ val failureModel =
+ if (phenomena.failures) {
+ grid5000(Duration.ofDays(7))
+ } else {
+ null
+ }
+
+ // Run workload trace
+ service.replay(timeSource, vms, seed, failureModel = failureModel, interference = phenomena.interference)
+
+ val serviceMetrics = service.getSchedulerStats()
+ logger.debug {
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
}
-
- // Run workload trace
- service.replay(timeSource, vms, seed, failureModel = failureModel, interference = phenomena.interference)
-
- val serviceMetrics = service.getSchedulerStats()
- logger.debug {
- "Scheduler " +
- "Success=${serviceMetrics.attemptsSuccess} " +
- "Failure=${serviceMetrics.attemptsFailure} " +
- "Error=${serviceMetrics.attemptsError} " +
- "Pending=${serviceMetrics.serversPending} " +
- "Active=${serviceMetrics.serversActive}"
}
}
- }
}
/**
@@ -307,46 +310,50 @@ public class OpenDCRunner(
val res = mutableListOf<HostSpec>()
val random = Random(0)
- val machines = topology.rooms.asSequence()
- .flatMap { room ->
- room.tiles.flatMap { tile ->
- val rack = tile.rack
- rack?.machines?.map { machine -> rack to machine } ?: emptyList()
+ val machines =
+ topology.rooms.asSequence()
+ .flatMap { room ->
+ room.tiles.flatMap { tile ->
+ val rack = tile.rack
+ rack?.machines?.map { machine -> rack to machine } ?: emptyList()
+ }
}
- }
for ((rack, machine) in machines) {
val clusterId = rack.id
val position = machine.position
- val processors = machine.cpus.flatMap { cpu ->
- val cores = cpu.numberOfCores
- val speed = cpu.clockRateMhz
- // TODO Remove hard coding of vendor
- val node = ProcessingNode("Intel", "amd64", cpu.name, cores)
- List(cores) { coreId ->
- ProcessingUnit(node, coreId, speed)
+ val processors =
+ machine.cpus.flatMap { cpu ->
+ val cores = cpu.numberOfCores
+ val speed = cpu.clockRateMhz
+ // TODO Remove hard coding of vendor
+ val node = ProcessingNode("Intel", "amd64", cpu.name, cores)
+ List(cores) { coreId ->
+ ProcessingUnit(node, coreId, speed)
+ }
+ }
+ val memoryUnits =
+ machine.memory.map { memory ->
+ MemoryUnit(
+ "Samsung",
+ memory.name,
+ memory.speedMbPerS,
+ memory.sizeMb.toLong(),
+ )
}
- }
- val memoryUnits = machine.memory.map { memory ->
- MemoryUnit(
- "Samsung",
- memory.name,
- memory.speedMbPerS,
- memory.sizeMb.toLong()
- )
- }
val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW }
val powerModel = CpuPowerModels.linear(2 * energyConsumptionW, energyConsumptionW * 0.5)
- val spec = HostSpec(
- UUID(random.nextLong(), random.nextLong()),
- "node-$clusterId-$position",
- mapOf("cluster" to clusterId),
- MachineModel(processors, memoryUnits),
- SimPsuFactories.simple(powerModel)
- )
+ val spec =
+ HostSpec(
+ UUID(random.nextLong(), random.nextLong()),
+ "node-$clusterId-$position",
+ mapOf("cluster" to clusterId),
+ MachineModel(processors, memoryUnits),
+ SimPsuFactories.simple(powerModel),
+ )
res += spec
}
@@ -358,10 +365,11 @@ public class OpenDCRunner(
* A custom [ForkJoinWorkerThreadFactory] that uses the [ClassLoader] of specified by the runner.
*/
private class RunnerThreadFactory(private val classLoader: ClassLoader) : ForkJoinWorkerThreadFactory {
- override fun newThread(pool: ForkJoinPool): ForkJoinWorkerThread = object : ForkJoinWorkerThread(pool) {
- init {
- contextClassLoader = classLoader
+ override fun newThread(pool: ForkJoinPool): ForkJoinWorkerThread =
+ object : ForkJoinWorkerThread(pool) {
+ init {
+ contextClassLoader = classLoader
+ }
}
- }
}
}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt
index 5b1b7132..7081041c 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt
@@ -44,16 +44,26 @@ internal class JobManagerImpl(private val client: OpenDCRunnerClient) : JobManag
}
}
- override fun heartbeat(id: Long, runtime: Int): Boolean {
+ override fun heartbeat(
+ id: Long,
+ runtime: Int,
+ ): Boolean {
val res = client.jobs.update(id, Job.Update(JobState.RUNNING, runtime))
return res?.state != JobState.FAILED
}
- override fun fail(id: Long, runtime: Int) {
+ override fun fail(
+ id: Long,
+ runtime: Int,
+ ) {
client.jobs.update(id, Job.Update(JobState.FAILED, runtime))
}
- override fun finish(id: Long, runtime: Int, results: Map<String, Any>) {
+ override fun finish(
+ id: Long,
+ runtime: Int,
+ results: Map<String, Any>,
+ ) {
client.jobs.update(id, Job.Update(JobState.FINISHED, runtime))
}
}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
index 774689c9..4576a463 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
@@ -34,31 +34,32 @@ import kotlin.math.roundToLong
*/
internal class WebComputeMonitor : ComputeMonitor {
override fun record(reader: HostTableReader) {
- val slices = reader.downtime / SLICE_LENGTH
+ val slices = reader.downtime / sliceLength
- hostAggregateMetrics = AggregateHostMetrics(
- hostAggregateMetrics.totalActiveTime + reader.cpuActiveTime,
- hostAggregateMetrics.totalIdleTime + reader.cpuIdleTime,
- hostAggregateMetrics.totalStealTime + reader.cpuStealTime,
- hostAggregateMetrics.totalLostTime + reader.cpuLostTime,
- hostAggregateMetrics.totalPowerDraw + reader.energyUsage,
- hostAggregateMetrics.totalFailureSlices + slices,
- hostAggregateMetrics.totalFailureVmSlices + reader.guestsRunning * slices
- )
+ hostAggregateMetrics =
+ AggregateHostMetrics(
+ hostAggregateMetrics.totalActiveTime + reader.cpuActiveTime,
+ hostAggregateMetrics.totalIdleTime + reader.cpuIdleTime,
+ hostAggregateMetrics.totalStealTime + reader.cpuStealTime,
+ hostAggregateMetrics.totalLostTime + reader.cpuLostTime,
+ hostAggregateMetrics.totalPowerDraw + reader.energyUsage,
+ hostAggregateMetrics.totalFailureSlices + slices,
+ hostAggregateMetrics.totalFailureVmSlices + reader.guestsRunning * slices,
+ )
hostMetrics.compute(reader.host.id) { _, prev ->
HostMetrics(
reader.cpuUsage + (prev?.cpuUsage ?: 0.0),
reader.cpuDemand + (prev?.cpuDemand ?: 0.0),
reader.guestsRunning + (prev?.instanceCount ?: 0),
- 1 + (prev?.count ?: 0)
+ 1 + (prev?.count ?: 0),
)
}
}
private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
private val hostMetrics: MutableMap<String, HostMetrics> = mutableMapOf()
- private val SLICE_LENGTH: Long = 5 * 60L
+ private val sliceLength: Long = 5 * 60L
private data class AggregateHostMetrics(
val totalActiveTime: Long = 0L,
@@ -67,14 +68,14 @@ internal class WebComputeMonitor : ComputeMonitor {
val totalLostTime: Long = 0L,
val totalPowerDraw: Double = 0.0,
val totalFailureSlices: Double = 0.0,
- val totalFailureVmSlices: Double = 0.0
+ val totalFailureVmSlices: Double = 0.0,
)
private data class HostMetrics(
val cpuUsage: Double,
val cpuDemand: Double,
val instanceCount: Long,
- val count: Long
+ val count: Long,
)
private lateinit var serviceData: ServiceData
@@ -106,7 +107,7 @@ internal class WebComputeMonitor : ComputeMonitor {
serviceData.serversTotal,
serviceData.serversPending,
serviceData.serversTotal - serviceData.serversPending - serviceData.serversActive,
- serviceData.attemptsError + serviceData.attemptsFailure
+ serviceData.attemptsError + serviceData.attemptsFailure,
)
}
@@ -128,6 +129,6 @@ internal class WebComputeMonitor : ComputeMonitor {
val totalVmsSubmitted: Int,
val totalVmsQueued: Int,
val totalVmsFinished: Int,
- val totalVmsFailed: Int
+ val totalVmsFailed: Int,
)
}