diff options
Diffstat (limited to 'opendc-experiments')
58 files changed, 1003 insertions, 884 deletions
diff --git a/opendc-experiments/opendc-experiments-base/build.gradle.kts b/opendc-experiments/opendc-experiments-base/build.gradle.kts index b30e468a..8aa82b67 100644 --- a/opendc-experiments/opendc-experiments-base/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-base/build.gradle.kts @@ -22,7 +22,7 @@ description = "Support library for simulating VM-based workloads with OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` `testing-conventions` diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/portfolio/model/Scenario.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/portfolio/model/Scenario.kt index 66fc76e4..cf0f5320 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/portfolio/model/Scenario.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/portfolio/model/Scenario.kt @@ -36,5 +36,5 @@ public data class Scenario( val workload: Workload, val operationalPhenomena: OperationalPhenomena, val allocationPolicy: String, - val partitions: Map<String, String> = emptyMap() + val partitions: Map<String, String> = emptyMap(), ) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/TraceHelpers.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/TraceHelpers.kt index 2afbd8a5..ddfa35cc 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/TraceHelpers.kt @@ -49,19 +49,22 @@ public class RunningServerWatcher : ServerWatcher { // TODO: make this changeable private val unlockStates: List<ServerState> = listOf(ServerState.TERMINATED, ServerState.ERROR, ServerState.DELETED) - private val _mutex: Mutex = Mutex() + private val mutex: Mutex = Mutex() public suspend fun lock() { - _mutex.lock() + mutex.lock() } public suspend fun wait() { this.lock() } - override fun onStateChanged(server: Server, newState: ServerState) { + override fun onStateChanged( + server: Server, + newState: ServerState, + ) { if (unlockStates.contains(newState)) { - _mutex.unlock() + mutex.unlock() } } } @@ -82,7 +85,7 @@ public suspend fun ComputeService.replay( seed: Long, submitImmediately: Boolean = false, failureModel: FailureModel? = null, - interference: Boolean = false + interference: Boolean = false, ) { val injector = failureModel?.createInjector(coroutineContext, clock, this, Random(seed)) val client = newClient() @@ -123,17 +126,18 @@ public suspend fun ComputeService.replay( } launch { - val server = client.newServer( - entry.name, - image, - client.newFlavor( + val server = + client.newServer( entry.name, - entry.cpuCount, - entry.memCapacity, - meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap() - ), - meta = meta - ) + image, + client.newFlavor( + entry.name, + entry.cpuCount, + entry.memCapacity, + meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap(), + ), + meta = meta, + ) val serverWatcher = RunningServerWatcher() serverWatcher.lock() diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 64230387..af37e352 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -22,7 +22,7 @@ description = "Experiments for the Capelin work" -/* Build configuration */ +// Build configuration plugins { `kotlin-conventions` `testing-conventions` @@ -56,10 +56,10 @@ val createCapelinApp by tasks.creating(CreateStartScripts::class) { applicationName = "capelin" mainClass.set("org.opendc.experiments.capelin.CapelinCli") classpath = tasks.jar.get().outputs.files + configurations["runtimeClasspath"] - outputDir = project.buildDir.resolve("scripts") + outputDir = project.layout.buildDirectory.get().asFile.resolve("scripts") } -/* Create custom Capelin distribution */ +// Create custom Capelin distribution distributions { main { distributionBaseName.set("capelin") diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index 06f8265c..f0084ec5 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -72,22 +72,24 @@ class CapelinBenchmarks { } @Benchmark - fun benchmarkCapelin() = runSimulation { - val serviceDomain = "compute.opendc.org" + fun benchmarkCapelin() = + runSimulation { + val serviceDomain = "compute.opendc.org" - Provisioner(dispatcher, seed = 0).use { provisioner -> - val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) + Provisioner(dispatcher, seed = 0).use { provisioner -> + val computeScheduler = + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)), + ) - provisioner.runSteps( - setupComputeService(serviceDomain, { computeScheduler }), - setupHosts(serviceDomain, topology, optimize = isOptimized) - ) + provisioner.runSteps( + setupComputeService(serviceDomain, { computeScheduler }), + setupHosts(serviceDomain, topology, optimize = isOptimized), + ) - val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! - service.replay(timeSource, vms, 0L, interference = true) + val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! + service.replay(timeSource, vms, 0L, interference = true) + } } - } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinCli.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinCli.kt index ac0bd506..5bec8c6d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinCli.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinCli.kt @@ -114,7 +114,7 @@ internal class CapelinCommand : CliktCommand(name = "capelin") { "hor-ver" to { HorVerPortfolio() }, "more-hpc" to { MoreHpcPortfolio() }, "more-velocity" to { MoreVelocityPortfolio() }, - "op-phen" to { OperationalPhenomenaPortfolio() } + "op-phen" to { OperationalPhenomenaPortfolio() }, ) /** @@ -140,12 +140,17 @@ internal class CapelinCommand : CliktCommand(name = "capelin") { /** * Run a single scenario. */ - private fun runScenario(runner: CapelinRunner, pool: ForkJoinPool, scenario: Scenario) { - val pb = ProgressBarBuilder() - .setInitialMax(repeats.toLong()) - .setStyle(ProgressBarStyle.ASCII) - .setTaskName("Simulating...") - .build() + private fun runScenario( + runner: CapelinRunner, + pool: ForkJoinPool, + scenario: Scenario, + ) { + val pb = + ProgressBarBuilder() + .setInitialMax(repeats.toLong()) + .setStyle(ProgressBarStyle.ASCII) + .setTaskName("Simulating...") + .build() pool.submit { LongStream.range(0, repeats.toLong()) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt index b97b7f94..0de72afa 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt @@ -50,7 +50,7 @@ import kotlin.math.roundToLong public class CapelinRunner( private val envPath: File, tracePath: File, - private val outputPath: File? + private val outputPath: File?, ) { /** * The [ComputeWorkloadLoader] to use for loading the traces. @@ -60,14 +60,17 @@ public class CapelinRunner( /** * Run a single [scenario] with the specified seed. */ - fun runScenario(scenario: Scenario, seed: Long) = runSimulation { + fun runScenario( + scenario: Scenario, + seed: Long, + ) = runSimulation { val serviceDomain = "compute.opendc.org" val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt")) Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain, { createComputeScheduler(scenario.allocationPolicy, Random(it.seeder.nextLong())) }), - setupHosts(serviceDomain, topology, optimize = true) + setupHosts(serviceDomain, topology, optimize = true), ) if (outputPath != null) { @@ -80,9 +83,9 @@ public class CapelinRunner( ParquetComputeMonitor( outputPath, partition, - bufferSize = 4096 - ) - ) + bufferSize = 4096, + ), + ), ) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/CompositeWorkloadPortfolio.kt index 40e3801f..140f0480 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/CompositeWorkloadPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/CompositeWorkloadPortfolio.kt @@ -34,46 +34,49 @@ import org.opendc.experiments.base.portfolio.model.Workload * A [Portfolio] that explores the effect of a composite workload. */ public class CompositeWorkloadPortfolio : Portfolio { - private val topologies = listOf( - Topology("base"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-ver-hom"), - Topology("exp-vel-ver-hom") - ) - private val workloads = listOf( - Workload( - "all-azure", - composite(trace("solvinity-short") to 0.0, trace("azure") to 1.0) - ), - Workload( - "solvinity-25-azure-75", - composite(trace("solvinity-short") to 0.25, trace("azure") to 0.75) - ), - Workload( - "solvinity-50-azure-50", - composite(trace("solvinity-short") to 0.5, trace("azure") to 0.5) - ), - Workload( - "solvinity-75-azure-25", - composite(trace("solvinity-short") to 0.75, trace("azure") to 0.25) - ), - Workload( - "all-solvinity", - composite(trace("solvinity-short") to 1.0, trace("azure") to 0.0) + private val topologies = + listOf( + Topology("base"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-ver-hom"), + Topology("exp-vel-ver-hom"), + ) + private val workloads = + listOf( + Workload( + "all-azure", + composite(trace("solvinity-short") to 0.0, trace("azure") to 1.0), + ), + Workload( + "solvinity-25-azure-75", + composite(trace("solvinity-short") to 0.25, trace("azure") to 0.75), + ), + Workload( + "solvinity-50-azure-50", + composite(trace("solvinity-short") to 0.5, trace("azure") to 0.5), + ), + Workload( + "solvinity-75-azure-25", + composite(trace("solvinity-short") to 0.75, trace("azure") to 0.25), + ), + Workload( + "all-solvinity", + composite(trace("solvinity-short") to 1.0, trace("azure") to 0.0), + ), ) - ) private val operationalPhenomena = OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false) private val allocationPolicy = "active-servers" - override val scenarios: Iterable<Scenario> = topologies.flatMap { topology -> - workloads.map { workload -> - Scenario( - topology, - workload, - operationalPhenomena, - allocationPolicy, - mapOf("topology" to topology.name, "workload" to workload.name) - ) + override val scenarios: Iterable<Scenario> = + topologies.flatMap { topology -> + workloads.map { workload -> + Scenario( + topology, + workload, + operationalPhenomena, + allocationPolicy, + mapOf("topology" to topology.name, "workload" to workload.name), + ) + } } - } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/HorVerPortfolio.kt index 1d68836c..da884f35 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/HorVerPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/HorVerPortfolio.kt @@ -34,36 +34,39 @@ import org.opendc.experiments.base.portfolio.model.Workload * A [Portfolio] that explores the difference between horizontal and vertical scaling. */ public class HorVerPortfolio : Portfolio { - private val topologies = listOf( - Topology("base"), - Topology("rep-vol-hor-hom"), - Topology("rep-vol-hor-het"), - Topology("rep-vol-ver-hom"), - Topology("rep-vol-ver-het"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-hor-het"), - Topology("exp-vol-ver-hom"), - Topology("exp-vol-ver-het") - ) + private val topologies = + listOf( + Topology("base"), + Topology("rep-vol-hor-hom"), + Topology("rep-vol-hor-het"), + Topology("rep-vol-ver-hom"), + Topology("rep-vol-ver-het"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-hor-het"), + Topology("exp-vol-ver-hom"), + Topology("exp-vol-ver-het"), + ) - private val workloads = listOf( - Workload("solvinity-10%", trace("solvinity").sampleByLoad(0.1)), - Workload("solvinity-25%", trace("solvinity").sampleByLoad(0.25)), - Workload("solvinity-50%", trace("solvinity").sampleByLoad(0.5)), - Workload("solvinity-100%", trace("solvinity").sampleByLoad(1.0)) - ) + private val workloads = + listOf( + Workload("solvinity-10%", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity-25%", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity-50%", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity-100%", trace("solvinity").sampleByLoad(1.0)), + ) private val operationalPhenomena = OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) private val allocationPolicy = "active-servers" - override val scenarios: Iterable<Scenario> = topologies.flatMap { topology -> - workloads.map { workload -> - Scenario( - topology, - workload, - operationalPhenomena, - allocationPolicy, - mapOf("topology" to topology.name, "workload" to workload.name) - ) + override val scenarios: Iterable<Scenario> = + topologies.flatMap { topology -> + workloads.map { workload -> + Scenario( + topology, + workload, + operationalPhenomena, + allocationPolicy, + mapOf("topology" to topology.name, "workload" to workload.name), + ) + } } - } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/MoreHpcPortfolio.kt index 1c222ae8..e060ff14 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/MoreHpcPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/MoreHpcPortfolio.kt @@ -35,34 +35,37 @@ import org.opendc.experiments.base.portfolio.model.Workload * A [Portfolio] to explore the effect of HPC workloads. */ public class MoreHpcPortfolio : Portfolio { - private val topologies = listOf( - Topology("base"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-ver-hom"), - Topology("exp-vel-ver-hom") - ) - private val workloads = listOf( - Workload("hpc-0%", trace("solvinity").sampleByHpc(0.0)), - Workload("hpc-25%", trace("solvinity").sampleByHpc(0.25)), - Workload("hpc-50%", trace("solvinity").sampleByHpc(0.5)), - Workload("hpc-100%", trace("solvinity").sampleByHpc(1.0)), - Workload("hpc-load-25%", trace("solvinity").sampleByHpcLoad(0.25)), - Workload("hpc-load-50%", trace("solvinity").sampleByHpcLoad(0.5)), - Workload("hpc-load-100%", trace("solvinity").sampleByHpcLoad(1.0)) - ) + private val topologies = + listOf( + Topology("base"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-ver-hom"), + Topology("exp-vel-ver-hom"), + ) + private val workloads = + listOf( + Workload("hpc-0%", trace("solvinity").sampleByHpc(0.0)), + Workload("hpc-25%", trace("solvinity").sampleByHpc(0.25)), + Workload("hpc-50%", trace("solvinity").sampleByHpc(0.5)), + Workload("hpc-100%", trace("solvinity").sampleByHpc(1.0)), + Workload("hpc-load-25%", trace("solvinity").sampleByHpcLoad(0.25)), + Workload("hpc-load-50%", trace("solvinity").sampleByHpcLoad(0.5)), + Workload("hpc-load-100%", trace("solvinity").sampleByHpcLoad(1.0)), + ) private val operationalPhenomena = OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) private val allocationPolicy: String = "active-servers" - override val scenarios: Iterable<Scenario> = topologies.flatMap { topology -> - workloads.map { workload -> - Scenario( - topology, - workload, - operationalPhenomena, - allocationPolicy, - mapOf("topology" to topology.name, "workload" to workload.name) - ) + override val scenarios: Iterable<Scenario> = + topologies.flatMap { topology -> + workloads.map { workload -> + Scenario( + topology, + workload, + operationalPhenomena, + allocationPolicy, + mapOf("topology" to topology.name, "workload" to workload.name), + ) + } } - } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/MoreVelocityPortfolio.kt index b2a751a3..0d6e190c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/MoreVelocityPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/MoreVelocityPortfolio.kt @@ -34,33 +34,36 @@ import org.opendc.experiments.base.portfolio.model.Workload * A [Portfolio] that explores the effect of adding more velocity to a cluster (e.g., faster machines). */ public class MoreVelocityPortfolio : Portfolio { - private val topologies = listOf( - Topology("base"), - Topology("rep-vel-ver-hom"), - Topology("rep-vel-ver-het"), - Topology("exp-vel-ver-hom"), - Topology("exp-vel-ver-het") - ) + private val topologies = + listOf( + Topology("base"), + Topology("rep-vel-ver-hom"), + Topology("rep-vel-ver-het"), + Topology("exp-vel-ver-hom"), + Topology("exp-vel-ver-het"), + ) - private val workloads = listOf( - Workload("solvinity-10%", trace("solvinity").sampleByLoad(0.1)), - Workload("solvinity-25%", trace("solvinity").sampleByLoad(0.25)), - Workload("solvinity-50%", trace("solvinity").sampleByLoad(0.5)), - Workload("solvinity-100%", trace("solvinity").sampleByLoad(1.0)) - ) + private val workloads = + listOf( + Workload("solvinity-10%", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity-25%", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity-50%", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity-100%", trace("solvinity").sampleByLoad(1.0)), + ) private val operationalPhenomena = OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) private val allocationPolicy = "active-servers" - override val scenarios: Iterable<Scenario> = topologies.flatMap { topology -> - workloads.map { workload -> - Scenario( - topology, - workload, - operationalPhenomena, - allocationPolicy, - mapOf("topology" to topology.name, "workload" to workload.name) - ) + override val scenarios: Iterable<Scenario> = + topologies.flatMap { topology -> + workloads.map { workload -> + Scenario( + topology, + workload, + operationalPhenomena, + allocationPolicy, + mapOf("topology" to topology.name, "workload" to workload.name), + ) + } } - } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/OperationalPhenomenaPortfolio.kt index b8c60b67..17c8bb48 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/OperationalPhenomenaPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/OperationalPhenomenaPortfolio.kt @@ -35,29 +35,32 @@ import org.opendc.experiments.base.portfolio.model.Workload */ public class OperationalPhenomenaPortfolio : Portfolio { private val topology = Topology("base") - private val workloads = listOf( - Workload("solvinity-10%", trace("solvinity").sampleByLoad(0.1)), - Workload("solvinity-25%", trace("solvinity").sampleByLoad(0.25)), - Workload("solvinity-50%", trace("solvinity").sampleByLoad(0.5)), - Workload("solvinity-100%", trace("solvinity").sampleByLoad(1.0)) - ) + private val workloads = + listOf( + Workload("solvinity-10%", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity-25%", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity-50%", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity-100%", trace("solvinity").sampleByLoad(1.0)), + ) - private val phenomenas = listOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), - OperationalPhenomena(failureFrequency = 0.0, hasInterference = true), - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false), - OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) - ) + private val phenomenas = + listOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), + OperationalPhenomena(failureFrequency = 0.0, hasInterference = true), + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false), + OperationalPhenomena(failureFrequency = 0.0, hasInterference = false), + ) - private val allocationPolicies = listOf( - "mem", - "mem-inv", - "core-mem", - "core-mem-inv", - "active-servers", - "active-servers-inv", - "random" - ) + private val allocationPolicies = + listOf( + "mem", + "mem-inv", + "core-mem", + "core-mem-inv", + "active-servers", + "active-servers-inv", + "random", + ) override val scenarios: Iterable<Scenario> = workloads.flatMap { workload -> @@ -68,7 +71,7 @@ public class OperationalPhenomenaPortfolio : Portfolio { workload, operationalPhenomena, allocationPolicy, - mapOf("workload" to workload.name, "scheduler" to allocationPolicy, "phenomena" to index.toString()) + mapOf("workload" to workload.name, "scheduler" to allocationPolicy, "phenomena" to index.toString()), ) } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/TestPortfolio.kt index f7314802..729fb017 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/TestPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolios/TestPortfolio.kt @@ -34,12 +34,13 @@ import org.opendc.experiments.base.portfolio.model.Workload * A [Portfolio] to perform a simple test run. */ public class TestPortfolio : Portfolio { - override val scenarios: Iterable<Scenario> = listOf( - Scenario( - Topology("single"), - Workload("bitbrains-small", trace("trace").sampleByLoad(1.0)), - OperationalPhenomena(failureFrequency = 0.0, hasInterference = true), - "active-servers" + override val scenarios: Iterable<Scenario> = + listOf( + Scenario( + Topology("single"), + Workload("bitbrains-small", trace("trace").sampleByLoad(1.0)), + OperationalPhenomena(failureFrequency = 0.0, hasInterference = true), + "active-servers", + ), ) - ) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 26cdb36e..6b538240 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -77,10 +77,11 @@ class CapelinIntegrationTest { @BeforeEach fun setUp() { monitor = TestComputeMonitor() - computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) + computeScheduler = + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)), + ) workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace")) } @@ -88,159 +89,166 @@ class CapelinIntegrationTest { * Test a large simulation setup. */ @Test - fun testLarge() = runSimulation { - val seed = 0L - val workload = createTestWorkload(1.0, seed) - val topology = createTopology() - val monitor = monitor + fun testLarge() = + runSimulation { + val seed = 0L + val workload = createTestWorkload(1.0, seed) + val topology = createTopology() + val monitor = monitor - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology) - ) + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed) - } + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed) + } - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.serversPending} " + - "Active=${monitor.serversActive}" - ) + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.serversPending} " + + "Active=${monitor.serversActive}", + ) - // Note that these values have been verified beforehand - assertAll( - { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, - { assertEquals(0, monitor.serversActive, "All VMs should finish after a run") }, - { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, - { assertEquals(0, monitor.serversPending, "No VM should not be in the queue") }, - { assertEquals(223379991650, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(66977091124, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(3160267873, monitor.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.8407E9, monitor.energyUsage, 1E4) { "Incorrect power draw" } } - ) - } + // Note that these values have been verified beforehand + assertAll( + { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, monitor.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, monitor.serversPending, "No VM should not be in the queue") }, + { assertEquals(223379991650, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66977091124, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(3160267873, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.8407E9, monitor.energyUsage, 1E4) { "Incorrect power draw" } }, + ) + } /** * Test a small simulation setup. */ @Test - fun testSmall() = runSimulation { - val seed = 1L - val workload = createTestWorkload(0.25, seed) - val topology = createTopology("single") - val monitor = monitor + fun testSmall() = + runSimulation { + val seed = 1L + val workload = createTestWorkload(0.25, seed) + val topology = createTopology("single") + val monitor = monitor - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology) - ) + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed) - } + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed) + } - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.serversPending} " + - "Active=${monitor.serversActive}" - ) + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.serversPending} " + + "Active=${monitor.serversActive}", + ) - // Note that these values have been verified beforehand - assertAll( - { assertEquals(10996730092, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9741285381, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(152, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(7.0109E8, monitor.energyUsage, 1E4) { "Incorrect power draw" } } - ) - } + // Note that these values have been verified beforehand + assertAll( + { assertEquals(10996730092, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9741285381, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(152, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(7.0109E8, monitor.energyUsage, 1E4) { "Incorrect power draw" } }, + ) + } /** * Test a small simulation setup with interference. */ @Test - fun testInterference() = runSimulation { - val seed = 0L - val workload = createTestWorkload(1.0, seed) - val topology = createTopology("single") + fun testInterference() = + runSimulation { + val seed = 0L + val workload = createTestWorkload(1.0, seed) + val topology = createTopology("single") - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology) - ) + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed, interference = true) - } + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed, interference = true) + } - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.serversPending} " + - "Active=${monitor.serversActive}" - ) + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.serversPending} " + + "Active=${monitor.serversActive}", + ) - // Note that these values have been verified beforehand - assertAll( - { assertEquals(42814948316, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(40138266225, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(23489356981, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(424267131, monitor.lostTime) { "Lost time incorrect" } } - ) - } + // Note that these values have been verified beforehand + assertAll( + { assertEquals(42814948316, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(40138266225, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(23489356981, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(424267131, monitor.lostTime) { "Lost time incorrect" } }, + ) + } /** * Test a small simulation setup with failures. */ @Test - fun testFailures() = runSimulation { - val seed = 0L - val topology = createTopology("single") - val workload = createTestWorkload(0.25, seed) - val monitor = monitor + fun testFailures() = + runSimulation { + val seed = 0L + val topology = createTopology("single") + val workload = createTestWorkload(0.25, seed) + val monitor = monitor - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology) - ) + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed, failureModel = grid5000(Duration.ofDays(7))) - } + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed, failureModel = grid5000(Duration.ofDays(7))) + } - // Note that these values have been verified beforehand - assertAll( - { assertEquals(1404277711, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(1478675712, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(152, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(360369187, monitor.uptime) { "Uptime incorrect" } } - ) - } + // Note that these values have been verified beforehand + assertAll( + { assertEquals(1404277711, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(1478675712, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(152, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(360369187, monitor.uptime) { "Uptime incorrect" } }, + ) + } /** * Obtain the trace reader for the test. */ - private fun createTestWorkload(fraction: Double, seed: Long): List<VirtualMachine> { + private fun createTestWorkload( + fraction: Double, + seed: Long, + ): List<VirtualMachine> { val source = trace("bitbrains-small").sampleByLoad(fraction) return source.resolve(workloadLoader, Random(seed)) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinRunnerTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinRunnerTest.kt index 7354e7a5..32d53aee 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinRunnerTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinRunnerTest.kt @@ -46,20 +46,20 @@ class CapelinRunnerTest { private val tracePath = File("src/test/resources/trace") /** - * Smoke test with output. + * Smoke test with output. fixme: Fix failures and enable Test */ -// @Test // fixme: Fix failures and enable fun testSmoke() { val outputPath = Files.createTempDirectory("output").toFile() try { val runner = CapelinRunner(envPath, tracePath, outputPath) - val scenario = Scenario( - Topology("topology"), - Workload("bitbrains-small", trace("bitbrains-small")), - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), - "active-servers" - ) + val scenario = + Scenario( + Topology("topology"), + Workload("bitbrains-small", trace("bitbrains-small")), + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), + "active-servers", + ) assertDoesNotThrow { runner.runScenario(scenario, seed = 0L) } } finally { @@ -68,17 +68,17 @@ class CapelinRunnerTest { } /** - * Smoke test without output. + * Smoke test without output. fixme: Fix failures and enable Test */ -// @Test // fixme: Fix failures and enable fun testSmokeNoOutput() { val runner = CapelinRunner(envPath, tracePath, null) - val scenario = Scenario( - Topology("topology"), - Workload("bitbrains-small", trace("bitbrains-small")), - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), - "active-servers" - ) + val scenario = + Scenario( + Topology("topology"), + Workload("bitbrains-small", trace("bitbrains-small")), + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), + "active-servers", + ) assertDoesNotThrow { runner.runScenario(scenario, seed = 0L) } } diff --git a/opendc-experiments/opendc-experiments-faas/build.gradle.kts b/opendc-experiments/opendc-experiments-faas/build.gradle.kts index 3cabbbf2..d217f320 100644 --- a/opendc-experiments/opendc-experiments-faas/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-faas/build.gradle.kts @@ -22,7 +22,7 @@ description = "Support library for simulating FaaS workloads with OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` `testing-conventions` diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt index 548abc9a..6c8cc0a2 100644 --- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt @@ -48,21 +48,23 @@ public class FaaSServiceProvisioningStep internal constructor( private val routingPolicy: (ProvisioningContext) -> RoutingPolicy, private val terminationPolicy: (ProvisioningContext) -> FunctionTerminationPolicy, private val machineModel: MachineModel, - private val coldStartModel: ColdStartModel? + private val coldStartModel: ColdStartModel?, ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { - val delayInjector = if (coldStartModel != null) { - StochasticDelayInjector(coldStartModel, Random(ctx.seeder.nextLong())) - } else { - ZeroDelayInjector - } + val delayInjector = + if (coldStartModel != null) { + StochasticDelayInjector(coldStartModel, Random(ctx.seeder.nextLong())) + } else { + ZeroDelayInjector + } val deployer = SimFunctionDeployer(ctx.dispatcher, machineModel, delayInjector) - val service = FaaSService( - ctx.dispatcher, - deployer, - routingPolicy(ctx), - terminationPolicy(ctx) - ) + val service = + FaaSService( + ctx.dispatcher, + deployer, + routingPolicy(ctx), + terminationPolicy(ctx), + ) ctx.registry.register(serviceDomain, FaaSService::class.java, service) diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSSteps.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSSteps.kt index ce76da0d..a84fe092 100644 --- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSSteps.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSSteps.kt @@ -46,7 +46,7 @@ public fun setupFaaSService( routingPolicy: (ProvisioningContext) -> RoutingPolicy, terminationPolicy: (ProvisioningContext) -> FunctionTerminationPolicy, machineModel: MachineModel, - coldStartModel: ColdStartModel? = null + coldStartModel: ColdStartModel? = null, ): ProvisioningStep { return FaaSServiceProvisioningStep(serviceDomain, routingPolicy, terminationPolicy, machineModel, coldStartModel) } diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FunctionSample.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FunctionSample.kt index 4ce2b136..4ca84da7 100644 --- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FunctionSample.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FunctionSample.kt @@ -40,5 +40,5 @@ public data class FunctionSample( val provisionedCpu: Int, val provisionedMem: Int, val cpuUsage: Double, - val memUsage: Double + val memUsage: Double, ) diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FunctionTraceWorkload.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FunctionTraceWorkload.kt index 71a2536c..1592e629 100644 --- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FunctionTraceWorkload.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FunctionTraceWorkload.kt @@ -31,6 +31,11 @@ import org.opendc.simulator.compute.workload.SimWorkload * A [SimFaaSWorkload] for a [FunctionTrace]. */ public class FunctionTraceWorkload(trace: FunctionTrace) : - SimFaaSWorkload, SimWorkload by SimTrace.ofFragments(trace.samples.map { SimTraceFragment(it.timestamp, it.duration, it.cpuUsage, 1) }).createWorkload(0) { + SimFaaSWorkload, + SimWorkload by SimTrace.ofFragments( + trace.samples.map { + SimTraceFragment(it.timestamp, it.duration, it.cpuUsage, 1) + }, + ).createWorkload(0) { override suspend fun invoke() {} } diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/ServerlessTraceReader.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/ServerlessTraceReader.kt index 7b6b3ef7..09412961 100644 --- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/ServerlessTraceReader.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/ServerlessTraceReader.kt @@ -42,9 +42,10 @@ public class ServerlessTraceReader { /** * The [CsvFactory] used to create the parser. */ - private val factory = CsvFactory() - .enable(CsvParser.Feature.ALLOW_COMMENTS) - .enable(CsvParser.Feature.TRIM_SPACES) + private val factory = + CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) /** * Parse the traces at the specified [path]. @@ -120,17 +121,18 @@ public class ServerlessTraceReader { /** * The [CsvSchema] that is used to parse the trace. */ - val schema = CsvSchema.builder() - .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) - .addColumn("Invocations", CsvSchema.ColumnType.NUMBER) - .addColumn("Avg Exec time per Invocation", CsvSchema.ColumnType.NUMBER) - .addColumn("Provisioned CPU [Mhz]", CsvSchema.ColumnType.NUMBER) - .addColumn("Provisioned Memory [mb]", CsvSchema.ColumnType.NUMBER) - .addColumn("Avg cpu usage per Invocation [Mhz]", CsvSchema.ColumnType.NUMBER) - .addColumn("Avg mem usage per Invocation [mb]", CsvSchema.ColumnType.NUMBER) - .addColumn("name", CsvSchema.ColumnType.STRING) - .setAllowComments(true) - .setUseHeader(true) - .build() + val schema = + CsvSchema.builder() + .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) + .addColumn("Invocations", CsvSchema.ColumnType.NUMBER) + .addColumn("Avg Exec time per Invocation", CsvSchema.ColumnType.NUMBER) + .addColumn("Provisioned CPU [Mhz]", CsvSchema.ColumnType.NUMBER) + .addColumn("Provisioned Memory [mb]", CsvSchema.ColumnType.NUMBER) + .addColumn("Avg cpu usage per Invocation [Mhz]", CsvSchema.ColumnType.NUMBER) + .addColumn("Avg mem usage per Invocation [mb]", CsvSchema.ColumnType.NUMBER) + .addColumn("name", CsvSchema.ColumnType.STRING) + .setAllowComments(true) + .setUseHeader(true) + .build() } } diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt index 7a354d69..faa13fa2 100644 --- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt @@ -37,7 +37,10 @@ import kotlin.math.max * @param clock An [InstantSource] instance tracking simulation time. * @param trace The trace to simulate. */ -public suspend fun FaaSService.replay(clock: InstantSource, trace: List<FunctionTrace>) { +public suspend fun FaaSService.replay( + clock: InstantSource, + trace: List<FunctionTrace>, +) { val client = newClient() try { coroutineScope { diff --git a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt index 9a3dba13..346059a8 100644 --- a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt +++ b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt @@ -46,34 +46,35 @@ class FaaSExperiment { * Smoke test that simulates a small trace. */ @Test - fun testSmoke() = runSimulation { - val faasService = "faas.opendc.org" + fun testSmoke() = + runSimulation { + val faasService = "faas.opendc.org" - Provisioner(dispatcher, seed = 0L).use { provisioner -> - provisioner.runStep( - setupFaaSService( - faasService, - { RandomRoutingPolicy() }, - { FunctionTerminationPolicyFixed(it.dispatcher, timeout = Duration.ofMinutes(10)) }, - createMachineModel(), - coldStartModel = ColdStartModel.GOOGLE + Provisioner(dispatcher, seed = 0L).use { provisioner -> + provisioner.runStep( + setupFaaSService( + faasService, + { RandomRoutingPolicy() }, + { FunctionTerminationPolicyFixed(it.dispatcher, timeout = Duration.ofMinutes(10)) }, + createMachineModel(), + coldStartModel = ColdStartModel.GOOGLE, + ), ) - ) - val service = provisioner.registry.resolve(faasService, FaaSService::class.java)!! + val service = provisioner.registry.resolve(faasService, FaaSService::class.java)!! - val trace = ServerlessTraceReader().parse(File("src/test/resources/trace")) - service.replay(timeSource, trace) + val trace = ServerlessTraceReader().parse(File("src/test/resources/trace")) + service.replay(timeSource, trace) - val stats = service.getSchedulerStats() + val stats = service.getSchedulerStats() - assertAll( - { assertEquals(14, stats.totalInvocations) }, - { assertEquals(2, stats.timelyInvocations) }, - { assertEquals(12, stats.delayedInvocations) } - ) + assertAll( + { assertEquals(14, stats.totalInvocations) }, + { assertEquals(2, stats.timelyInvocations) }, + { assertEquals(12, stats.delayedInvocations) }, + ) + } } - } /** * Construct the machine model to test with. @@ -82,8 +83,10 @@ class FaaSExperiment { val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) return MachineModel( - /*cpus*/ List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, - /*memory*/ List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + // cpus + List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + // memory + List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }, ) } } diff --git a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/ServerlessTraceReaderTest.kt b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/ServerlessTraceReaderTest.kt index 54071791..bc4f5457 100644 --- a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/ServerlessTraceReaderTest.kt +++ b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/ServerlessTraceReaderTest.kt @@ -39,7 +39,7 @@ class ServerlessTraceReaderTest { assertAll( { assertEquals(2, trace.size) }, { assertEquals("004c1ea5eb15978682b00ab659aed21e2835d5287668da8d5267f751fdfbdd78", trace[0].id) }, - { assertEquals(256, trace[0].maxMemory) } + { assertEquals(256, trace[0].maxMemory) }, ) } } diff --git a/opendc-experiments/opendc-experiments-greenifier/build.gradle.kts b/opendc-experiments/opendc-experiments-greenifier/build.gradle.kts index 74fa249c..45672545 100644 --- a/opendc-experiments/opendc-experiments-greenifier/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-greenifier/build.gradle.kts @@ -22,7 +22,7 @@ description = "Experiments for the Greenifier work" -/* Build configuration */ +// Build configuration plugins { `kotlin-conventions` `testing-conventions` @@ -56,10 +56,10 @@ val createGreenifierApp by tasks.creating(CreateStartScripts::class) { applicationName = "greenifier" mainClass.set("org.opendc.experiments.greenifier.GreenifierCli") classpath = tasks.jar.get().outputs.files + configurations["runtimeClasspath"] - outputDir = project.buildDir.resolve("scripts") + outputDir = project.layout.buildDirectory.get().asFile.resolve("scripts") } -/* Create custom Greenifier distribution */ +// Create custom Greenifier distribution distributions { main { distributionBaseName.set("greenifier") diff --git a/opendc-experiments/opendc-experiments-greenifier/src/jmh/kotlin/org/opendc/experiments/greenifier/GreenifierBenchmarks.kt b/opendc-experiments/opendc-experiments-greenifier/src/jmh/kotlin/org/opendc/experiments/greenifier/GreenifierBenchmarks.kt index 7997d01c..6cc6df36 100644 --- a/opendc-experiments/opendc-experiments-greenifier/src/jmh/kotlin/org/opendc/experiments/greenifier/GreenifierBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-greenifier/src/jmh/kotlin/org/opendc/experiments/greenifier/GreenifierBenchmarks.kt @@ -72,22 +72,24 @@ class GreenifierBenchmarks { } @Benchmark - fun benchmarkGreenifier() = runSimulation { - val serviceDomain = "compute.opendc.org" + fun benchmarkGreenifier() = + runSimulation { + val serviceDomain = "compute.opendc.org" - Provisioner(dispatcher, seed = 0).use { provisioner -> - val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) + Provisioner(dispatcher, seed = 0).use { provisioner -> + val computeScheduler = + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)), + ) - provisioner.runSteps( - setupComputeService(serviceDomain, { computeScheduler }), - setupHosts(serviceDomain, topology, optimize = isOptimized) - ) + provisioner.runSteps( + setupComputeService(serviceDomain, { computeScheduler }), + setupHosts(serviceDomain, topology, optimize = isOptimized), + ) - val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! - service.replay(timeSource, vms, 0L, interference = true) + val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! + service.replay(timeSource, vms, 0L, interference = true) + } } - } } diff --git a/opendc-experiments/opendc-experiments-greenifier/src/main/kotlin/org/opendc/experiments/greenifier/GreenifierCli.kt b/opendc-experiments/opendc-experiments-greenifier/src/main/kotlin/org/opendc/experiments/greenifier/GreenifierCli.kt index efdc96cd..93557500 100644 --- a/opendc-experiments/opendc-experiments-greenifier/src/main/kotlin/org/opendc/experiments/greenifier/GreenifierCli.kt +++ b/opendc-experiments/opendc-experiments-greenifier/src/main/kotlin/org/opendc/experiments/greenifier/GreenifierCli.kt @@ -104,7 +104,7 @@ internal class GreenifierCommand : CliktCommand(name = "greenifier") { */ private val portfolio by argument(help = "portfolio to replay") .choice( - "greenifier" to { GreenifierPortfolio() } + "greenifier" to { GreenifierPortfolio() }, ) .default({ GreenifierPortfolio() }) @@ -131,12 +131,17 @@ internal class GreenifierCommand : CliktCommand(name = "greenifier") { /** * Run a single scenario. */ - private fun runScenario(runner: GreenifierRunner, pool: ForkJoinPool, scenario: Scenario) { - val pb = ProgressBarBuilder() - .setInitialMax(repeats.toLong()) - .setStyle(ProgressBarStyle.ASCII) - .setTaskName("Simulating...") - .build() + private fun runScenario( + runner: GreenifierRunner, + pool: ForkJoinPool, + scenario: Scenario, + ) { + val pb = + ProgressBarBuilder() + .setInitialMax(repeats.toLong()) + .setStyle(ProgressBarStyle.ASCII) + .setTaskName("Simulating...") + .build() pool.submit { LongStream.range(0, repeats.toLong()) diff --git a/opendc-experiments/opendc-experiments-greenifier/src/main/kotlin/org/opendc/experiments/greenifier/GreenifierPortfolio.kt b/opendc-experiments/opendc-experiments-greenifier/src/main/kotlin/org/opendc/experiments/greenifier/GreenifierPortfolio.kt index eee30b81..f7fd204f 100644 --- a/opendc-experiments/opendc-experiments-greenifier/src/main/kotlin/org/opendc/experiments/greenifier/GreenifierPortfolio.kt +++ b/opendc-experiments/opendc-experiments-greenifier/src/main/kotlin/org/opendc/experiments/greenifier/GreenifierPortfolio.kt @@ -34,26 +34,29 @@ import org.opendc.experiments.base.portfolio.model.Workload * A [Portfolio] that explores the difference between horizontal and vertical scaling. */ public class GreenifierPortfolio : Portfolio { - private val topologies = listOf( - Topology("single"), - Topology("multi") - ) + private val topologies = + listOf( + Topology("single"), + Topology("multi"), + ) - private val workloads = listOf( - Workload("bitbrains-small", trace("trace").sampleByLoad(1.0)) - ) + private val workloads = + listOf( + Workload("bitbrains-small", trace("trace").sampleByLoad(1.0)), + ) private val operationalPhenomena = OperationalPhenomena(0.0, false) private val allocationPolicy = "active-servers" - override val scenarios: Iterable<Scenario> = topologies.flatMap { topology -> - workloads.map { workload -> - Scenario( - topology, - workload, - operationalPhenomena, - allocationPolicy, - mapOf("topology" to topology.name, "workload" to workload.name) - ) + override val scenarios: Iterable<Scenario> = + topologies.flatMap { topology -> + workloads.map { workload -> + Scenario( + topology, + workload, + operationalPhenomena, + allocationPolicy, + mapOf("topology" to topology.name, "workload" to workload.name), + ) + } } - } } diff --git a/opendc-experiments/opendc-experiments-greenifier/src/main/kotlin/org/opendc/experiments/greenifier/GreenifierRunner.kt b/opendc-experiments/opendc-experiments-greenifier/src/main/kotlin/org/opendc/experiments/greenifier/GreenifierRunner.kt index 2c2962f3..6da35cd1 100644 --- a/opendc-experiments/opendc-experiments-greenifier/src/main/kotlin/org/opendc/experiments/greenifier/GreenifierRunner.kt +++ b/opendc-experiments/opendc-experiments-greenifier/src/main/kotlin/org/opendc/experiments/greenifier/GreenifierRunner.kt @@ -50,7 +50,7 @@ import kotlin.math.roundToLong public class GreenifierRunner( private val envPath: File, tracePath: File, - private val outputPath: File? + private val outputPath: File?, ) { /** * The [ComputeWorkloadLoader] to use for loading the traces. @@ -60,14 +60,17 @@ public class GreenifierRunner( /** * Run a single [scenario] with the specified seed. */ - fun runScenario(scenario: Scenario, seed: Long) = runSimulation { + fun runScenario( + scenario: Scenario, + seed: Long, + ) = runSimulation { val serviceDomain = "compute.opendc.org" val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt")) Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain, { createComputeScheduler(scenario.allocationPolicy, Random(it.seeder.nextLong())) }), - setupHosts(serviceDomain, topology, optimize = true) + setupHosts(serviceDomain, topology, optimize = true), ) if (outputPath != null) { @@ -80,9 +83,9 @@ public class GreenifierRunner( ParquetComputeMonitor( outputPath, partition, - bufferSize = 4096 - ) - ) + bufferSize = 4096, + ), + ), ) } diff --git a/opendc-experiments/opendc-experiments-greenifier/src/test/kotlin/org/opendc/experiments/greenifier/GreenifierIntegrationTest.kt b/opendc-experiments/opendc-experiments-greenifier/src/test/kotlin/org/opendc/experiments/greenifier/GreenifierIntegrationTest.kt index dbf840a4..36b15ee0 100644 --- a/opendc-experiments/opendc-experiments-greenifier/src/test/kotlin/org/opendc/experiments/greenifier/GreenifierIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-greenifier/src/test/kotlin/org/opendc/experiments/greenifier/GreenifierIntegrationTest.kt @@ -77,10 +77,11 @@ class GreenifierIntegrationTest { @BeforeEach fun setUp() { monitor = TestComputeMonitor() - computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) + computeScheduler = + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)), + ) workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace")) } @@ -88,159 +89,166 @@ class GreenifierIntegrationTest { * Test a large simulation setup. */ @Test - fun testLarge() = runSimulation { - val seed = 0L - val workload = createTestWorkload(1.0, seed) - val topology = createTopology() - val monitor = monitor + fun testLarge() = + runSimulation { + val seed = 0L + val workload = createTestWorkload(1.0, seed) + val topology = createTopology() + val monitor = monitor - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology) - ) + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed) - } + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed) + } - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.serversPending} " + - "Active=${monitor.serversActive}" - ) + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.serversPending} " + + "Active=${monitor.serversActive}", + ) - // Note that these values have been verified beforehand - assertAll( - { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, - { assertEquals(0, monitor.serversActive, "All VMs should finish after a run") }, - { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, - { assertEquals(0, monitor.serversPending, "No VM should not be in the queue") }, - { assertEquals(223379991650, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(66977091124, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(3160267873, monitor.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.8407E9, monitor.energyUsage, 1E4) { "Incorrect power draw" } } - ) - } + // Note that these values have been verified beforehand + assertAll( + { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, monitor.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, monitor.serversPending, "No VM should not be in the queue") }, + { assertEquals(223379991650, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66977091124, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(3160267873, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.8407E9, monitor.energyUsage, 1E4) { "Incorrect power draw" } }, + ) + } /** * Test a small simulation setup. */ @Test - fun testSmall() = runSimulation { - val seed = 1L - val workload = createTestWorkload(0.25, seed) - val topology = createTopology("single") - val monitor = monitor + fun testSmall() = + runSimulation { + val seed = 1L + val workload = createTestWorkload(0.25, seed) + val topology = createTopology("single") + val monitor = monitor - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology) - ) + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed) - } + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed) + } - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.serversPending} " + - "Active=${monitor.serversActive}" - ) + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.serversPending} " + + "Active=${monitor.serversActive}", + ) - // Note that these values have been verified beforehand - assertAll( - { assertEquals(10996730092, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9741285381, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(152, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(7.0109E8, monitor.energyUsage, 1E4) { "Incorrect power draw" } } - ) - } + // Note that these values have been verified beforehand + assertAll( + { assertEquals(10996730092, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9741285381, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(152, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(7.0109E8, monitor.energyUsage, 1E4) { "Incorrect power draw" } }, + ) + } /** * Test a small simulation setup with interference. */ @Test - fun testInterference() = runSimulation { - val seed = 0L - val workload = createTestWorkload(1.0, seed) - val topology = createTopology("single") + fun testInterference() = + runSimulation { + val seed = 0L + val workload = createTestWorkload(1.0, seed) + val topology = createTopology("single") - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology) - ) + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed, interference = true) - } + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed, interference = true) + } - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.serversPending} " + - "Active=${monitor.serversActive}" - ) + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.serversPending} " + + "Active=${monitor.serversActive}", + ) - // Note that these values have been verified beforehand - assertAll( - { assertEquals(42814948316, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(40138266225, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(23489356981, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(424267131, monitor.lostTime) { "Lost time incorrect" } } - ) - } + // Note that these values have been verified beforehand + assertAll( + { assertEquals(42814948316, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(40138266225, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(23489356981, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(424267131, monitor.lostTime) { "Lost time incorrect" } }, + ) + } /** * Test a small simulation setup with failures. */ @Test - fun testFailures() = runSimulation { - val seed = 0L - val topology = createTopology("single") - val workload = createTestWorkload(0.25, seed) - val monitor = monitor + fun testFailures() = + runSimulation { + val seed = 0L + val topology = createTopology("single") + val workload = createTestWorkload(0.25, seed) + val monitor = monitor - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology) - ) + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed, failureModel = grid5000(Duration.ofDays(7))) - } + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed, failureModel = grid5000(Duration.ofDays(7))) + } - // Note that these values have been verified beforehand - assertAll( - { assertEquals(1404277711, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(1478675712, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(152, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(360369187, monitor.uptime) { "Uptime incorrect" } } - ) - } + // Note that these values have been verified beforehand + assertAll( + { assertEquals(1404277711, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(1478675712, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(152, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(360369187, monitor.uptime) { "Uptime incorrect" } }, + ) + } /** * Obtain the trace reader for the test. */ - private fun createTestWorkload(fraction: Double, seed: Long): List<VirtualMachine> { + private fun createTestWorkload( + fraction: Double, + seed: Long, + ): List<VirtualMachine> { val source = trace("bitbrains-small").sampleByLoad(fraction) return source.resolve(workloadLoader, Random(seed)) } diff --git a/opendc-experiments/opendc-experiments-greenifier/src/test/kotlin/org/opendc/experiments/greenifier/GreenifierRunnerTest.kt b/opendc-experiments/opendc-experiments-greenifier/src/test/kotlin/org/opendc/experiments/greenifier/GreenifierRunnerTest.kt index ad3113e1..b6d6a6e9 100644 --- a/opendc-experiments/opendc-experiments-greenifier/src/test/kotlin/org/opendc/experiments/greenifier/GreenifierRunnerTest.kt +++ b/opendc-experiments/opendc-experiments-greenifier/src/test/kotlin/org/opendc/experiments/greenifier/GreenifierRunnerTest.kt @@ -46,20 +46,20 @@ class GreenifierRunnerTest { private val tracePath = File("src/test/resources/trace") /** - * Smoke test with output. + * Smoke test with output. fixme: Fix failures and enable Test */ -// @Test // fixme: Fix failures and enable fun testSmoke() { val outputPath = Files.createTempDirectory("output").toFile() try { val runner = GreenifierRunner(envPath, tracePath, outputPath) - val scenario = Scenario( - Topology("topology"), - Workload("bitbrains-small", trace("bitbrains-small")), - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), - "active-servers" - ) + val scenario = + Scenario( + Topology("topology"), + Workload("bitbrains-small", trace("bitbrains-small")), + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), + "active-servers", + ) assertDoesNotThrow { runner.runScenario(scenario, seed = 0L) } } finally { @@ -68,17 +68,17 @@ class GreenifierRunnerTest { } /** - * Smoke test without output. + * Smoke test without output. fixme: Fix failures and enable Test */ -// @Test // fixme: Fix failures and enable fun testSmokeNoOutput() { val runner = GreenifierRunner(envPath, tracePath, null) - val scenario = Scenario( - Topology("topology"), - Workload("bitbrains-small", trace("bitbrains-small")), - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), - "active-servers" - ) + val scenario = + Scenario( + Topology("topology"), + Workload("bitbrains-small", trace("bitbrains-small")), + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), + "active-servers", + ) assertDoesNotThrow { runner.runScenario(scenario, seed = 0L) } } diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts index 7b3b084f..b66958ca 100644 --- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts @@ -22,7 +22,7 @@ description = "TensorFlow application model in OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-conventions` `testing-conventions` diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/Models.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/Models.kt index be166bd5..78a63df8 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/Models.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/Models.kt @@ -35,7 +35,7 @@ import org.opendc.experiments.tf20.keras.layer.regularization.Dropout /** * Construct an AlexNet model with the given batch size. */ -fun AlexNet(batchSize: Long): TrainableModel { +fun getAlexNet(batchSize: Long): TrainableModel { return Sequential( Input(batchSize, 227, 227, 3, name = "Input"), Conv2D(longArrayOf(11, 11, 3, 96), longArrayOf(1, 4, 4, 1), padding = ConvPadding.VALID, name = "conv1"), @@ -51,14 +51,14 @@ fun AlexNet(batchSize: Long): TrainableModel { Conv2D(longArrayOf(1, 1, 4096, 4096), longArrayOf(1, 1, 1, 1), padding = ConvPadding.SAME, name = "fc7"), Dropout(0.5f, name = "dropout7"), Conv2D(longArrayOf(1, 1, 4096, 1000), longArrayOf(1, 1, 1, 1), padding = ConvPadding.SAME, name = "f8"), - ActivationLayer(Activation.Softmax, name = "softmax") + ActivationLayer(Activation.Softmax, name = "softmax"), ) } /** * Construct an VGG16 model with the given batch size. */ -fun VGG16(batchSize: Long = 128): TrainableModel { +fun getVGG16(batchSize: Long = 128): TrainableModel { return Sequential( Input(batchSize, 224, 224, 3, name = "Input"), Conv2D(longArrayOf(3, 3, 3, 64), longArrayOf(1, 1, 1, 1), padding = ConvPadding.SAME, name = "conv1-1"), @@ -84,6 +84,6 @@ fun VGG16(batchSize: Long = 128): TrainableModel { Conv2D(longArrayOf(1, 1, 4096, 4096), longArrayOf(1, 1, 1, 1), padding = ConvPadding.SAME, name = "fc7"), Dropout(0.5f, name = "dropout7"), Conv2D(longArrayOf(1, 1, 4096, 1000), longArrayOf(1, 1, 1, 1), padding = ConvPadding.SAME, name = "f8"), - ActivationLayer(Activation.Softmax, name = "softmax") + ActivationLayer(Activation.Softmax, name = "softmax"), ) } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index a1fc3fba..b14e499c 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -54,102 +54,107 @@ public class SimTFDevice( dispatcher: Dispatcher, pu: ProcessingUnit, private val memory: MemoryUnit, - powerModel: CpuPowerModel + powerModel: CpuPowerModel, ) : TFDevice { /** * The [SimMachine] representing the device. */ - private val machine = SimBareMetalMachine.create( - FlowEngine.create(dispatcher).newGraph(), - MachineModel(listOf(pu), listOf(memory)), - SimPsuFactories.simple(powerModel) - ) + private val machine = + SimBareMetalMachine.create( + FlowEngine.create(dispatcher).newGraph(), + MachineModel(listOf(pu), listOf(memory)), + SimPsuFactories.simple(powerModel), + ) /** * The workload that will be run by the device. */ - private val workload = object : SimWorkload, FlowStageLogic { - /** - * The [FlowStage] of the workload. - */ - var stage: FlowStage? = null - - /** - * The output of the workload. - */ - private var output: OutPort? = null - - /** - * The queue of work to run. - */ - val queue = ArrayDeque<Work>() - - /** - * A flag to indicate that the workload is idle. - */ - val isIdle - get() = activeWork == null - - /** - * The active work of the workload. - */ - private var activeWork: Work? = null - - /** - * The timestamp of the last pull. - */ - private var lastPull: Long = 0L - - override fun onStart(ctx: SimMachineContext) { - val stage = ctx.graph.newStage(this) - this.stage = stage - output = stage.getOutlet("out") - lastPull = ctx.graph.engine.clock.millis() - - ctx.graph.connect(output, ctx.cpus[0].input) - } - - override fun onStop(ctx: SimMachineContext) { - stage?.close() - stage = null - output = null - } - - override fun setOffset(now: Long) {} + private val workload = + object : SimWorkload, FlowStageLogic { + /** + * The [FlowStage] of the workload. + */ + var stage: FlowStage? = null + + /** + * The output of the workload. + */ + private var output: OutPort? = null + + /** + * The queue of work to run. + */ + val queue = ArrayDeque<Work>() + + /** + * A flag to indicate that the workload is idle. + */ + val isIdle + get() = activeWork == null + + /** + * The active work of the workload. + */ + private var activeWork: Work? = null + + /** + * The timestamp of the last pull. + */ + private var lastPull: Long = 0L + + override fun onStart(ctx: SimMachineContext) { + val stage = ctx.graph.newStage(this) + this.stage = stage + output = stage.getOutlet("out") + lastPull = ctx.graph.engine.clock.millis() + + ctx.graph.connect(output, ctx.cpus[0].input) + } - override fun snapshot(): SimWorkload = throw UnsupportedOperationException() + override fun onStop(ctx: SimMachineContext) { + stage?.close() + stage = null + output = null + } - override fun onUpdate(ctx: FlowStage, now: Long): Long { - val output = output ?: return Long.MAX_VALUE - val lastPull = lastPull - this.lastPull = now - val delta = (now - lastPull).coerceAtLeast(0) - val consumedWork = output.rate * delta / 1000.0 + override fun setOffset(now: Long) {} + + override fun snapshot(): SimWorkload = throw UnsupportedOperationException() + + override fun onUpdate( + ctx: FlowStage, + now: Long, + ): Long { + val output = output ?: return Long.MAX_VALUE + val lastPull = lastPull + this.lastPull = now + val delta = (now - lastPull).coerceAtLeast(0) + val consumedWork = output.rate * delta / 1000.0 + + val activeWork = activeWork + if (activeWork != null) { + if (activeWork.consume(consumedWork)) { + this.activeWork = null + } else { + val duration = ceil(activeWork.flops / output.capacity * 1000).toLong() + output.push(output.capacity) + return now + duration + } + } - val activeWork = activeWork - if (activeWork != null) { - if (activeWork.consume(consumedWork)) { - this.activeWork = null - } else { - val duration = ceil(activeWork.flops / output.capacity * 1000).toLong() + val queue = queue + val head = queue.poll() + return if (head != null) { + this.activeWork = head + val duration = (head.flops / output.capacity * 1000).roundToLong() output.push(output.capacity) - return now + duration + now + duration + } else { + output.push(0.0f) + Long.MAX_VALUE } } - - val queue = queue - val head = queue.poll() - return if (head != null) { - this.activeWork = head - val duration = (head.flops / output.capacity * 1000).roundToLong() - output.push(output.capacity) - now + duration - } else { - output.push(0.0f) - Long.MAX_VALUE - } } - } init { machine.startWorkload(workload, emptyMap()) {} @@ -160,12 +165,13 @@ public class SimTFDevice( delay(duration.toLong()) } - override suspend fun compute(flops: Double) = suspendCancellableCoroutine<Unit> { cont -> - workload.queue.add(Work(flops, cont)) - if (workload.isIdle) { - workload.stage?.invalidate() + override suspend fun compute(flops: Double) = + suspendCancellableCoroutine<Unit> { cont -> + workload.queue.add(Work(flops, cont)) + if (workload.isIdle) { + workload.stage?.invalidate() + } } - } override fun getDeviceStats(): TFDeviceStats { return TFDeviceStats(machine.cpuUsage, machine.psu.powerDraw, machine.psu.energyUsage) diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDeviceStats.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDeviceStats.kt index 3fea44da..c40982f8 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDeviceStats.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDeviceStats.kt @@ -32,5 +32,5 @@ package org.opendc.experiments.tf20.core data class TFDeviceStats( val resourceUsage: Double, val powerDraw: Double, - val energyUsage: Double + val energyUsage: Double, ) diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/MirroredStrategy.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/MirroredStrategy.kt index 8caa7ec9..69d180a9 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/MirroredStrategy.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/MirroredStrategy.kt @@ -32,7 +32,11 @@ import org.opendc.experiments.tf20.core.TFDevice * It creates one replica per GPU device. Each variable in the model is mirrored across all the replicas. */ public class MirroredStrategy(val devices: List<TFDevice>) : Strategy { - override suspend fun run(forward: Double, backward: Double, batchSize: Int) = coroutineScope { + override suspend fun run( + forward: Double, + backward: Double, + batchSize: Int, + ) = coroutineScope { for (device in devices) { launch { device.compute(forward * batchSize / devices.size + backward) } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/OneDeviceStrategy.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/OneDeviceStrategy.kt index 271fab98..05235b12 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/OneDeviceStrategy.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/OneDeviceStrategy.kt @@ -28,7 +28,11 @@ import org.opendc.experiments.tf20.core.TFDevice * A distribution [Strategy] that places all variables and computation on a single specified device. */ public class OneDeviceStrategy(val device: TFDevice) : Strategy { - override suspend fun run(forward: Double, backward: Double, batchSize: Int) { + override suspend fun run( + forward: Double, + backward: Double, + batchSize: Int, + ) { device.compute(forward * batchSize + backward) } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt index 3e755b56..d5da628a 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt @@ -29,5 +29,9 @@ public interface Strategy { /** * Converge the specified batch using the given strategy. */ - public suspend fun run(forward: Double, backward: Double, batchSize: Int) + public suspend fun run( + forward: Double, + backward: Double, + batchSize: Int, + ) } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/TrainableModel.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/TrainableModel.kt index 2cac6cbc..2d621d16 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/TrainableModel.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/TrainableModel.kt @@ -110,7 +110,10 @@ public abstract class TrainableModel(vararg layers: Layer) : AutoCloseable { * @param [epochs] Number of epochs to train the model. An epoch is an iteration over the entire x and y data provided. * @param [batchSize] Number of samples per gradient update. */ - public suspend fun fit(epochs: Int = 5, batchSize: Int = 32) { + public suspend fun fit( + epochs: Int = 5, + batchSize: Int = 32, + ) { check(isCompiled) { "Model not yet compiled." } val forwardFlops = forward() diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/activations/Activation.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/activations/Activation.kt index 403acfc0..cb3b778e 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/activations/Activation.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/activations/Activation.kt @@ -194,5 +194,5 @@ public enum class Activation { * * @see <a href="https://arxiv.org/abs/1710.05941">Ramachandran et al., 2017</a> */ - Swish; + Swish, } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/conv/Conv2D.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/conv/Conv2D.kt index 74124bbd..f89c47c6 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/conv/Conv2D.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/conv/Conv2D.kt @@ -35,13 +35,12 @@ import kotlin.math.ceil * Finally, if `activation` is applied to the outputs as well. */ public class Conv2D( - public val filter: LongArray = LongArray(4), // [H, W, channel_in, channel_out] - public val strides: LongArray = LongArray(4), // [1, stride_h, stride_w, 1] + public val filter: LongArray = LongArray(4), + public val strides: LongArray = LongArray(4), public val activation: Activation = Activation.Relu, public val padding: ConvPadding = ConvPadding.VALID, - name: String = "" + name: String = "", ) : Layer(name) { - private var padHeight: Double = 0.0 private var padWidth: Double = 0.0 diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/conv/ConvPadding.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/conv/ConvPadding.kt index 03ae6282..a47c435a 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/conv/ConvPadding.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/conv/ConvPadding.kt @@ -35,5 +35,5 @@ public enum class ConvPadding { /** * No padding. */ - VALID + VALID, } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/core/ActivationLayer.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/core/ActivationLayer.kt index 60b0f754..000401b9 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/core/ActivationLayer.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/core/ActivationLayer.kt @@ -31,9 +31,8 @@ import org.opendc.experiments.tf20.keras.shape.TensorShape */ public class ActivationLayer( public val activation: Activation = Activation.Relu, - name: String = "" + name: String = "", ) : Layer(name) { - override fun build(inputShape: TensorShape) { // Intentionally left empty } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/pool/Pool2D.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/pool/Pool2D.kt index 3c6b15bb..a9a54938 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/pool/Pool2D.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/pool/Pool2D.kt @@ -40,9 +40,8 @@ public class Pool2D( public val poolSize: IntArray = intArrayOf(1, 2, 2, 1), public val strides: IntArray = intArrayOf(1, 2, 2, 1), public val padding: ConvPadding = ConvPadding.VALID, - name: String + name: String, ) : Layer(name) { - private var padHeight = 0L private var padWidth = 0L diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/regularization/Dropout.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/regularization/Dropout.kt index ff5f7711..8198f98c 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/regularization/Dropout.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/layer/regularization/Dropout.kt @@ -38,7 +38,7 @@ import org.opendc.experiments.tf20.keras.shape.TensorShape */ public class Dropout( public val keepProbability: Float = 0.1f, - name: String + name: String, ) : Layer(name) { override fun build(inputShape: TensorShape) {} diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/shape/TensorShape.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/shape/TensorShape.kt index 7affcb63..67e00e24 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/shape/TensorShape.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/shape/TensorShape.kt @@ -33,7 +33,7 @@ public class TensorShape(vararg dims: Long) { /** * The dimensions of the tensor represented as [LongArray]. */ - private val _dims: LongArray = dims + private val localDims: LongArray = dims /** * Return amount of elements in Tensor with the given shape. @@ -42,7 +42,7 @@ public class TensorShape(vararg dims: Long) { get() { var prod = 1L for (i in 0 until rank) { - prod *= abs(_dims[i]) + prod *= abs(localDims[i]) } return prod } @@ -51,7 +51,7 @@ public class TensorShape(vararg dims: Long) { * Returns the rank of this shape. */ public val rank: Int - get() = _dims.size + get() = localDims.size /** * Returns the value of a dimension @@ -60,7 +60,7 @@ public class TensorShape(vararg dims: Long) { * @return The size of dimension i */ public operator fun get(i: Int): Long { - return _dims[i] + return localDims[i] } /** @@ -70,7 +70,7 @@ public class TensorShape(vararg dims: Long) { * @return Whether dimension i is unknown (equal to -1) */ private fun isKnown(i: Int): Boolean { - return _dims[i] != -1L + return localDims[i] != -1L } /** @@ -80,21 +80,21 @@ public class TensorShape(vararg dims: Long) { * @return The size of dimension i */ public fun size(i: Int): Long { - return _dims[i] + return localDims[i] } /** * Clone the [TensorShape] and return a new instance. */ public fun clone(): TensorShape { - return TensorShape(*_dims) + return TensorShape(*localDims) } /** * Create a string representation of this [TensorShape]. */ override fun toString(): String { - return _dims.contentToString().replace("-1", "None") + return localDims.contentToString().replace("-1", "None") } override fun equals(other: Any?): Boolean { @@ -103,12 +103,12 @@ public class TensorShape(vararg dims: Long) { other as TensorShape - if (!_dims.contentEquals(other._dims)) return false + if (!localDims.contentEquals(other.localDims)) return false return true } override fun hashCode(): Int { - return _dims.contentHashCode() + return localDims.contentHashCode() } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/Message.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/Message.kt index d6360873..fddcc779 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/Message.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/Message.kt @@ -35,5 +35,5 @@ public data class Message( val to: NetworkNode, val type: MessageType, val dataSize: Long, - val iterations: Int + val iterations: Int, ) diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/MessageType.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/MessageType.kt index 8be16261..d7130137 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/MessageType.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/MessageType.kt @@ -27,5 +27,5 @@ package org.opendc.experiments.tf20.network */ public enum class MessageType { REQUEST, - WEIGHTS + WEIGHTS, } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt index 5b408fb3..a4e79b4e 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt @@ -63,7 +63,11 @@ public class NetworkController(dispatcher: Dispatcher) : AutoCloseable { /** * Add a connection between two links. */ - public fun addConnection(node1: NetworkNode, node2: NetworkNode, bandwidth: Long) { + public fun addConnection( + node1: NetworkNode, + node2: NetworkNode, + bandwidth: Long, + ) { bandwidthMatrix[Pair(node1, node2)] = bandwidth } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt index 2a7578b3..077bcc04 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt @@ -53,56 +53,58 @@ public class MLEnvironmentReader { var isGpuFlag = true var maxPower = 350.0 var minPower = 200.0 - val cores = machine.cpus.flatMap { id -> - when (id) { - 1 -> { - // ref: https://www.guru3d.com/articles-pages/nvidia-geforce-gtx-titan-x-review,8.html#:~:text=GeForce%20GTX%20Titan%20X%20%2D%20On,power%20supply%20unit%20as%20minimum. - maxPower = 334.0 - minPower = 90.0 - val node = ProcessingNode("NVidia", "TITAN X", "Pascal", 4992) - List(node.coreCount) { ProcessingUnit(node, it, 824.0) } + val cores = + machine.cpus.flatMap { id -> + when (id) { + 1 -> { + // ref: https://www.guru3d.com/articles-pages/nvidia-geforce-gtx-titan-x-review,8.html#:~:text=GeForce%20GTX%20Titan%20X%20%2D%20On,power%20supply%20unit%20as%20minimum. + maxPower = 334.0 + minPower = 90.0 + val node = ProcessingNode("NVidia", "TITAN X", "Pascal", 4992) + List(node.coreCount) { ProcessingUnit(node, it, 824.0) } + } + 2 -> { + // ref: https://www.microway.com/hpc-tech-tips/nvidia-tesla-p100-pci-e-16gb-gpu-accelerator-pascal-gp100-close/ + maxPower = 250.0 + minPower = 125.0 + val node = ProcessingNode("NVIDIA", "Tesla P100", "Pascal", 3584) + List(node.coreCount) { ProcessingUnit(node, it, 1190.0) } + } + 3 -> { + // ref: https://www.anandtech.com/show/10923/openpower-saga-tyans-1u-power8-gt75/7 + minPower = 84.0 + maxPower = 135.0 + val node = ProcessingNode("Intel", "E5-2690v3 Haswell24", "amd64", 24) + isGpuFlag = false + List(node.coreCount) { ProcessingUnit(node, it, 3498.0) } + } + 4 -> { + minPower = 130.0 + maxPower = 190.0 + val node = ProcessingNode("IBM", "POWER8", "RISC", 10) + isGpuFlag = false + List(node.coreCount) { ProcessingUnit(node, it, 143000.0) } // 28600.0 3690 + } + else -> throw IllegalArgumentException("The cpu id $id is not recognized") } - 2 -> { - // ref: https://www.microway.com/hpc-tech-tips/nvidia-tesla-p100-pci-e-16gb-gpu-accelerator-pascal-gp100-close/ - maxPower = 250.0 - minPower = 125.0 - val node = ProcessingNode("NVIDIA", "Tesla P100", "Pascal", 3584) - List(node.coreCount) { ProcessingUnit(node, it, 1190.0) } - } - 3 -> { - // ref: https://www.anandtech.com/show/10923/openpower-saga-tyans-1u-power8-gt75/7 - minPower = 84.0 - maxPower = 135.0 - val node = ProcessingNode("Intel", "E5-2690v3 Haswell24", "amd64", 24) - isGpuFlag = false - List(node.coreCount) { ProcessingUnit(node, it, 3498.0) } - } - 4 -> { - minPower = 130.0 - maxPower = 190.0 - val node = ProcessingNode("IBM", "POWER8", "RISC", 10) - isGpuFlag = false - List(node.coreCount) { ProcessingUnit(node, it, 143000.0) } // 28600.0 3690 - } - else -> throw IllegalArgumentException("The cpu id $id is not recognized") } - } - val memories = machine.memories.map { id -> - when (id) { - 1 -> MemoryUnit("NVidia", "GDDR5X", 480.0, 24L) - 2 -> MemoryUnit("NVidia", "GDDR5X", 720.0, 16L) - 3 -> MemoryUnit("IBM", "GDDR5X", 115.0, 160L) - 4 -> MemoryUnit("Inter", "GDDR5X", 68.0, 512L) - else -> throw IllegalArgumentException("The cpu id $id is not recognized") + val memories = + machine.memories.map { id -> + when (id) { + 1 -> MemoryUnit("NVidia", "GDDR5X", 480.0, 24L) + 2 -> MemoryUnit("NVidia", "GDDR5X", 720.0, 16L) + 3 -> MemoryUnit("IBM", "GDDR5X", 115.0, 160L) + 4 -> MemoryUnit("Inter", "GDDR5X", 68.0, 512L) + else -> throw IllegalArgumentException("The cpu id $id is not recognized") + } } - } MachineDef( UUID(0, counter.toLong()), "node-${counter++}", mapOf("gpu" to isGpuFlag), MachineModel(cores, memories), - CpuPowerModels.linear(maxPower, minPower) + CpuPowerModels.linear(maxPower, minPower), ) } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt index 6b72e155..7ff91797 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt @@ -34,5 +34,5 @@ public data class MachineDef( val name: String, val meta: Map<String, Any>, val model: MachineModel, - val powerModel: CpuPowerModel + val powerModel: CpuPowerModel, ) diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt index 899aafc0..e3814175 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt @@ -41,114 +41,121 @@ class TensorFlowTest { * Smoke test that tests the capabilities of the TensorFlow application model in OpenDC. */ @Test - fun testSmokeAlexNet() = runSimulation { - val envInput = checkNotNull(TensorFlowTest::class.java.getResourceAsStream("/kth.json")) - val def = MLEnvironmentReader().readEnvironment(envInput).first() - - val device = SimTFDevice( - def.uid, - def.meta["gpu"] as Boolean, - dispatcher, - def.model.cpus[0], - def.model.memory[0], - CpuPowerModels.linear(250.0, 60.0) - ) - val strategy = OneDeviceStrategy(device) - val batchSize = 32 - val model = AlexNet(batchSize.toLong()) - model.use { - it.compile(strategy) - - it.fit(epochs = 9088 / batchSize, batchSize = batchSize) + fun testSmokeAlexNet() = + runSimulation { + val envInput = checkNotNull(TensorFlowTest::class.java.getResourceAsStream("/kth.json")) + val def = MLEnvironmentReader().readEnvironment(envInput).first() + + val device = + SimTFDevice( + def.uid, + def.meta["gpu"] as Boolean, + dispatcher, + def.model.cpus[0], + def.model.memory[0], + CpuPowerModels.linear(250.0, 60.0), + ) + val strategy = OneDeviceStrategy(device) + val batchSize = 32 + val model = getAlexNet(batchSize.toLong()) + model.use { + it.compile(strategy) + + it.fit(epochs = 9088 / batchSize, batchSize = batchSize) + } + + device.close() + + val stats = device.getDeviceStats() + assertAll( + { assertEquals(3309694252, timeSource.millis()) }, + { assertEquals(8.27423563E8, stats.energyUsage) }, + ) } - device.close() - - val stats = device.getDeviceStats() - assertAll( - { assertEquals(3309694252, timeSource.millis()) }, - { assertEquals(8.27423563E8, stats.energyUsage) } - ) - } - /** * Smoke test that tests the capabilities of the TensorFlow application model in OpenDC. */ @Test - fun testSmokeVGG() = runSimulation { - val envInput = checkNotNull(TensorFlowTest::class.java.getResourceAsStream("/kth.json")) - val def = MLEnvironmentReader().readEnvironment(envInput).first() - - val device = SimTFDevice( - def.uid, - def.meta["gpu"] as Boolean, - dispatcher, - def.model.cpus[0], - def.model.memory[0], - CpuPowerModels.linear(250.0, 60.0) - ) - val strategy = OneDeviceStrategy(device) - val batchSize = 128 - val model = VGG16(batchSize.toLong()) - model.use { - it.compile(strategy) - - it.fit(epochs = 9088 / batchSize, batchSize = batchSize) + fun testSmokeVGG() = + runSimulation { + val envInput = checkNotNull(TensorFlowTest::class.java.getResourceAsStream("/kth.json")) + val def = MLEnvironmentReader().readEnvironment(envInput).first() + + val device = + SimTFDevice( + def.uid, + def.meta["gpu"] as Boolean, + dispatcher, + def.model.cpus[0], + def.model.memory[0], + CpuPowerModels.linear(250.0, 60.0), + ) + val strategy = OneDeviceStrategy(device) + val batchSize = 128 + val model = getVGG16(batchSize.toLong()) + model.use { + it.compile(strategy) + + it.fit(epochs = 9088 / batchSize, batchSize = batchSize) + } + + device.close() + + val stats = device.getDeviceStats() + assertAll( + { assertEquals(176230328513, timeSource.millis()) }, + { assertEquals(4.405758212825E10, stats.energyUsage) }, + ) } - device.close() - - val stats = device.getDeviceStats() - assertAll( - { assertEquals(176230328513, timeSource.millis()) }, - { assertEquals(4.405758212825E10, stats.energyUsage) } - ) - } - /** * Smoke test that tests the capabilities of the TensorFlow application model in OpenDC. */ @Test - fun testSmokeDistribute() = runSimulation { - val envInput = checkNotNull(TensorFlowTest::class.java.getResourceAsStream("/kth.json")) - val def = MLEnvironmentReader().readEnvironment(envInput).first() - - val deviceA = SimTFDevice( - def.uid, - def.meta["gpu"] as Boolean, - dispatcher, - def.model.cpus[0], - def.model.memory[0], - CpuPowerModels.linear(250.0, 60.0) - ) - - val deviceB = SimTFDevice( - UUID.randomUUID(), - def.meta["gpu"] as Boolean, - dispatcher, - def.model.cpus[0], - def.model.memory[0], - CpuPowerModels.linear(250.0, 60.0) - ) - - val strategy = MirroredStrategy(listOf(deviceA, deviceB)) - val batchSize = 32 - val model = AlexNet(batchSize.toLong()) - model.use { - it.compile(strategy) - - it.fit(epochs = 9088 / batchSize, batchSize = batchSize) + fun testSmokeDistribute() = + runSimulation { + val envInput = checkNotNull(TensorFlowTest::class.java.getResourceAsStream("/kth.json")) + val def = MLEnvironmentReader().readEnvironment(envInput).first() + + val deviceA = + SimTFDevice( + def.uid, + def.meta["gpu"] as Boolean, + dispatcher, + def.model.cpus[0], + def.model.memory[0], + CpuPowerModels.linear(250.0, 60.0), + ) + + val deviceB = + SimTFDevice( + UUID.randomUUID(), + def.meta["gpu"] as Boolean, + dispatcher, + def.model.cpus[0], + def.model.memory[0], + CpuPowerModels.linear(250.0, 60.0), + ) + + val strategy = MirroredStrategy(listOf(deviceA, deviceB)) + val batchSize = 32 + val model = getAlexNet(batchSize.toLong()) + model.use { + it.compile(strategy) + + it.fit(epochs = 9088 / batchSize, batchSize = batchSize) + } + + deviceA.close() + deviceB.close() + + val statsA = deviceA.getDeviceStats() + val statsB = deviceB.getDeviceStats() + assertAll( + { assertEquals(1704994000, timeSource.millis()) }, + { assertEquals(4.262485E8, statsA.energyUsage) }, + { assertEquals(4.262485E8, statsB.energyUsage) }, + ) } - - deviceA.close() - deviceB.close() - - val statsA = deviceA.getDeviceStats() - val statsB = deviceB.getDeviceStats() - assertAll( - { assertEquals(1704994000, timeSource.millis()) }, - { assertEquals(4.262485E8, statsA.energyUsage) }, - { assertEquals(4.262485E8, statsB.energyUsage) } - ) - } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt index 549c6f3e..76473868 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt @@ -39,36 +39,38 @@ import java.util.UUID */ internal class SimTFDeviceTest { @Test - fun testSmoke() = runSimulation { - val puNode = ProcessingNode("NVIDIA", "Tesla V100", "unknown", 1) - val pu = ProcessingUnit(puNode, 0, 960 * 1230.0) - val memory = MemoryUnit("NVIDIA", "Tesla V100", 877.0, 32_000) + fun testSmoke() = + runSimulation { + val puNode = ProcessingNode("NVIDIA", "Tesla V100", "unknown", 1) + val pu = ProcessingUnit(puNode, 0, 960 * 1230.0) + val memory = MemoryUnit("NVIDIA", "Tesla V100", 877.0, 32_000) - val device = SimTFDevice( - UUID.randomUUID(), - isGpu = true, - dispatcher, - pu, - memory, - CpuPowerModels.linear(250.0, 100.0) - ) + val device = + SimTFDevice( + UUID.randomUUID(), + isGpu = true, + dispatcher, + pu, + memory, + CpuPowerModels.linear(250.0, 100.0), + ) - // Load 1 GiB into GPU memory - device.load(1000) - assertEquals(1140, timeSource.millis()) + // Load 1 GiB into GPU memory + device.load(1000) + assertEquals(1140, timeSource.millis()) - coroutineScope { - launch { device.compute(1e6) } - launch { device.compute(2e6) } - } + coroutineScope { + launch { device.compute(1e6) } + launch { device.compute(2e6) } + } - device.close() + device.close() - val stats = device.getDeviceStats() + val stats = device.getDeviceStats() - assertAll( - { assertEquals(3681, timeSource.millis()) }, - { assertEquals(749.25, stats.energyUsage) } - ) - } + assertAll( + { assertEquals(3681, timeSource.millis()) }, + { assertEquals(749.25, stats.energyUsage) }, + ) + } } diff --git a/opendc-experiments/opendc-experiments-workflow/build.gradle.kts b/opendc-experiments/opendc-experiments-workflow/build.gradle.kts index a5a2ea54..ff5144c5 100644 --- a/opendc-experiments/opendc-experiments-workflow/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-workflow/build.gradle.kts @@ -22,7 +22,7 @@ description = "Support library for simulating workflows with OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` `testing-conventions` diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt index 2037dad4..e396901c 100644 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt @@ -66,30 +66,35 @@ public fun Trace.toJobs(): List<Job> { val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } val id = reader.getString(TASK_ID)!!.toLong() - val grantedCpus = if (reader.resolve(TASK_ALLOC_NCPUS) != 0) { - reader.getInt(TASK_ALLOC_NCPUS) - } else { - reader.getInt(TASK_REQ_NCPUS) - } + val grantedCpus = + if (reader.resolve(TASK_ALLOC_NCPUS) != 0) { + reader.getInt(TASK_ALLOC_NCPUS) + } else { + reader.getInt(TASK_REQ_NCPUS) + } val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!! val runtime = reader.getDuration(TASK_RUNTIME)!! val flops: Long = 4000 * runtime.seconds * grantedCpus val workload = SimWorkloads.flops(flops, 1.0) - val task = Task( - UUID(0L, id), - "<unnamed>", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to grantedCpus, - WORKFLOW_TASK_DEADLINE to runtime.toMillis() + val task = + Task( + UUID(0L, id), + "<unnamed>", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to grantedCpus, + WORKFLOW_TASK_DEADLINE to runtime.toMillis(), + ), ) - ) tasks[id] = task taskDependencies[task] = reader.getSet(TASK_PARENTS, String::class.java)!!.map { it.toLong() }.toSet() - (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) } + (workflow.metadata as MutableMap<String, Any>).merge( + "WORKFLOW_SUBMIT_TIME", + submitTime.toEpochMilli(), + ) { a, b -> min(a as Long, b as Long) } (workflow.tasks as MutableSet<Task>).add(task) } @@ -110,7 +115,10 @@ public fun Trace.toJobs(): List<Job> { /** * Helper method to replay the specified list of [jobs] and suspend execution util all jobs have finished. */ -public suspend fun WorkflowService.replay(clock: InstantSource, jobs: List<Job>) { +public suspend fun WorkflowService.replay( + clock: InstantSource, + jobs: List<Job>, +) { // Sort jobs by their arrival time val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long } if (orderedJobs.isEmpty()) { diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt index 8bd087e7..cb8056a7 100644 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt @@ -36,5 +36,5 @@ public data class WorkflowSchedulerSpec( val jobAdmissionPolicy: JobAdmissionPolicy, val jobOrderPolicy: JobOrderPolicy, val taskEligibilityPolicy: TaskEligibilityPolicy, - val taskOrderPolicy: TaskOrderPolicy + val taskOrderPolicy: TaskOrderPolicy, ) diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt index 862ebf3d..af2a4871 100644 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt @@ -40,21 +40,25 @@ public class WorkflowServiceProvisioningStep internal constructor( private val serviceDomain: String, private val computeService: String, private val scheduler: WorkflowSchedulerSpec, - private val schedulingQuantum: Duration + private val schedulingQuantum: Duration, ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { - val computeService = requireNotNull(ctx.registry.resolve(computeService, ComputeService::class.java)) { "Compute service $computeService does not exist" } + val computeService = + requireNotNull( + ctx.registry.resolve(computeService, ComputeService::class.java), + ) { "Compute service $computeService does not exist" } val client = computeService.newClient() - val service = WorkflowService( - ctx.dispatcher, - client, - scheduler.schedulingQuantum, - jobAdmissionPolicy = scheduler.jobAdmissionPolicy, - jobOrderPolicy = scheduler.jobOrderPolicy, - taskEligibilityPolicy = scheduler.taskEligibilityPolicy, - taskOrderPolicy = scheduler.taskOrderPolicy - ) + val service = + WorkflowService( + ctx.dispatcher, + client, + scheduler.schedulingQuantum, + jobAdmissionPolicy = scheduler.jobAdmissionPolicy, + jobOrderPolicy = scheduler.jobOrderPolicy, + taskEligibilityPolicy = scheduler.taskEligibilityPolicy, + taskOrderPolicy = scheduler.taskOrderPolicy, + ) ctx.registry.register(serviceDomain, WorkflowService::class.java, service) return AutoCloseable { diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt index efcbf889..bfcf3734 100644 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt @@ -35,7 +35,7 @@ public fun setupWorkflowService( serviceDomain: String, computeService: String, scheduler: WorkflowSchedulerSpec, - schedulingQuantum: Duration = Duration.ofMinutes(5) + schedulingQuantum: Duration = Duration.ofMinutes(5), ): ProvisioningStep { return WorkflowServiceProvisioningStep(serviceDomain, computeService, scheduler, schedulingQuantum) } |
