diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-11-13 18:16:19 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-11-13 18:16:19 +0000 |
| commit | 52eed48441693149993db79b63431b99e0973027 (patch) | |
| tree | ba267db531bc3d81409ddfe9caeb6d3b5a65e8c8 /opendc-experiments | |
| parent | 183cfa96910ebb74c668dea7ef98071966f8fcb9 (diff) | |
| parent | 33d91ef30ad7bcb73365934fe536461210d1082a (diff) | |
merge: Increase minimum Java version to 17 (#115)
This pull request increases the minimum version of Java required by OpenDC to 17.
This new version of Java introduces several new features compared to our old minimum
version (11), which we attempt to apply in this conversion.
## Implementation Notes :hammer_and_pick:
* Increase minimum Java version to Java 17
* Use RandomGenerator as randomness source
* Add common dispatcher interface
* Add compatibility with Kotlin coroutines
* Use InstantSource as time source
* Re-implement SimulationScheduler as Dispatcher
* Replace use of CoroutineContext by Dispatcher
## External Dependencies :four_leaf_clover:
* Java 17
## Breaking API Changes :warning:
* The use of `CoroutineContext` and `Clock` as parameters of classes has been replaced
by the `Dispatcher` interface.
* The use of `Clock` has been replaced by `InstantSource` which does not carry time
zone info.
* The use of `Random` and `SplittableRandom` as parameter type has been replaced
by `RandomGenerator`
Diffstat (limited to 'opendc-experiments')
28 files changed, 91 insertions, 129 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt index 7fe3a2eb..eae5806e 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt @@ -22,13 +22,12 @@ package org.opendc.experiments.provisioner +import org.opendc.common.Dispatcher import org.opendc.experiments.MutableServiceRegistry import org.opendc.experiments.ServiceRegistry import org.opendc.experiments.internal.ServiceRegistryImpl -import java.time.Clock import java.util.ArrayDeque import java.util.SplittableRandom -import kotlin.coroutines.CoroutineContext /** * A helper class to set up the experimental environment in a reproducible manner. @@ -37,17 +36,15 @@ import kotlin.coroutines.CoroutineContext * [ProvisioningStep]s are executed sequentially and ensure that the necessary infrastructure is configured and teared * down after the simulation completes. * - * @param coroutineContext The [CoroutineContext] in which the environment is set up. - * @param clock The simulation [Clock]. + * @param dispatcher The [Dispatcher] implementation for scheduling future tasks. * @param seed A seed for initializing the randomness of the environment. */ -public class Provisioner(coroutineContext: CoroutineContext, clock: Clock, seed: Long) : AutoCloseable { +public class Provisioner(dispatcher: Dispatcher, seed: Long) : AutoCloseable { /** * Implementation of [ProvisioningContext]. */ private val context = object : ProvisioningContext { - override val clock: Clock = clock - override val coroutineContext: CoroutineContext = coroutineContext + override val dispatcher: Dispatcher = dispatcher override val seeder: SplittableRandom = SplittableRandom(seed) override val registry: MutableServiceRegistry = ServiceRegistryImpl() diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt index 73897315..e53044ce 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt @@ -22,31 +22,26 @@ package org.opendc.experiments.provisioner +import org.opendc.common.Dispatcher import org.opendc.experiments.MutableServiceRegistry -import java.time.Clock import java.util.SplittableRandom -import kotlin.coroutines.CoroutineContext +import java.util.random.RandomGenerator /** * The [ProvisioningContext] class provides access to shared state between subsequent [ProvisioningStep]s, as well as - * access to the simulation dispatcher (via [CoroutineContext]), the virtual clock, and a randomness seeder to allow + * access to the simulation dispatcher, the virtual clock, and a randomness seeder to allow * the provisioning steps to initialize the (simulated) resources. */ public interface ProvisioningContext { /** - * The [CoroutineContext] in which the provisioner runs. + * The [Dispatcher] provided by the provisioner to schedule future events during the simulation. */ - public val coroutineContext: CoroutineContext - - /** - * The [Clock] tracking the virtual simulation time. - */ - public val clock: Clock + public val dispatcher: Dispatcher /** * A [SplittableRandom] instance used to seed the provisioners. */ - public val seeder: SplittableRandom + public val seeder: RandomGenerator /** * A [MutableServiceRegistry] where the provisioned services are registered. diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index 3e3d758d..1221f084 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -75,7 +75,7 @@ class CapelinBenchmarks { fun benchmarkCapelin() = runSimulation { val serviceDomain = "compute.opendc.org" - Provisioner(coroutineContext, clock, seed = 0).use { provisioner -> + Provisioner(dispatcher, seed = 0).use { provisioner -> val computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) @@ -87,7 +87,7 @@ class CapelinBenchmarks { ) val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! - service.replay(clock, vms, 0L, interference = true) + service.replay(timeSource, vms, 0L, interference = true) } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt index 2c3573dc..2567a4d5 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt @@ -64,7 +64,7 @@ public class CapelinRunner( val serviceDomain = "compute.opendc.org" val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt")) - Provisioner(coroutineContext, clock, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain, { createComputeScheduler(scenario.allocationPolicy, Random(it.seeder.nextLong())) }), setupHosts(serviceDomain, topology, optimize = true) @@ -96,7 +96,7 @@ public class CapelinRunner( null } - service.replay(clock, vms, seed, failureModel = failureModel, interference = operationalPhenomena.hasInterference) + service.replay(timeSource, vms, seed, failureModel = failureModel, interference = operationalPhenomena.hasInterference) } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt index 0b4cafa6..3a2acbd7 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt @@ -34,8 +34,9 @@ import org.opendc.simulator.compute.power.CpuPowerModel import org.opendc.simulator.compute.power.CpuPowerModels import java.io.File import java.io.InputStream -import java.util.Random +import java.util.SplittableRandom import java.util.UUID +import java.util.random.RandomGenerator import kotlin.math.roundToLong /** @@ -49,7 +50,7 @@ private val reader = ClusterSpecReader() fun clusterTopology( file: File, powerModel: CpuPowerModel = CpuPowerModels.linear(350.0, 200.0), - random: Random = Random(0) + random: RandomGenerator = SplittableRandom(0) ): List<HostSpec> { return clusterTopology(reader.read(file), powerModel, random) } @@ -60,7 +61,7 @@ fun clusterTopology( fun clusterTopology( input: InputStream, powerModel: CpuPowerModel = CpuPowerModels.linear(350.0, 200.0), - random: Random = Random(0) + random: RandomGenerator = SplittableRandom(0) ): List<HostSpec> { return clusterTopology(reader.read(input), powerModel, random) } @@ -68,14 +69,14 @@ fun clusterTopology( /** * Construct a topology from the given list of [clusters]. */ -fun clusterTopology(clusters: List<ClusterSpec>, powerModel: CpuPowerModel, random: Random = Random(0)): List<HostSpec> { +fun clusterTopology(clusters: List<ClusterSpec>, powerModel: CpuPowerModel, random: RandomGenerator = SplittableRandom(0)): List<HostSpec> { return clusters.flatMap { it.toHostSpecs(random, powerModel) } } /** * Helper method to convert a [ClusterSpec] into a list of [HostSpec]s. */ -private fun ClusterSpec.toHostSpecs(random: Random, powerModel: CpuPowerModel): List<HostSpec> { +private fun ClusterSpec.toHostSpecs(random: RandomGenerator, powerModel: CpuPowerModel): List<HostSpec> { val cpuSpeed = cpuSpeed val memoryPerHost = memCapacityPerHost.roundToLong() diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 77b0d09f..7e01bb64 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -94,7 +94,7 @@ class CapelinIntegrationTest { val topology = createTopology() val monitor = monitor - Provisioner(coroutineContext, clock, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), @@ -102,7 +102,7 @@ class CapelinIntegrationTest { ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(clock, workload, seed) + service.replay(timeSource, workload, seed) } println( @@ -138,7 +138,7 @@ class CapelinIntegrationTest { val topology = createTopology("single") val monitor = monitor - Provisioner(coroutineContext, clock, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), @@ -146,7 +146,7 @@ class CapelinIntegrationTest { ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(clock, workload, seed) + service.replay(timeSource, workload, seed) } println( @@ -177,7 +177,7 @@ class CapelinIntegrationTest { val workload = createTestWorkload(1.0, seed) val topology = createTopology("single") - Provisioner(coroutineContext, clock, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), @@ -185,7 +185,7 @@ class CapelinIntegrationTest { ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(clock, workload, seed, interference = true) + service.replay(timeSource, workload, seed, interference = true) } println( @@ -216,7 +216,7 @@ class CapelinIntegrationTest { val workload = createTestWorkload(0.25, seed) val monitor = monitor - Provisioner(coroutineContext, clock, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), @@ -224,7 +224,7 @@ class CapelinIntegrationTest { ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(clock, workload, seed, failureModel = grid5000(Duration.ofDays(7))) + service.replay(timeSource, workload, seed, failureModel = grid5000(Duration.ofDays(7))) } // Note that these values have been verified beforehand diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt index bbc70489..125ba6ef 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt @@ -34,12 +34,13 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher import org.opendc.compute.service.scheduler.weights.RamWeigher import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import java.util.Random +import java.util.SplittableRandom +import java.util.random.RandomGenerator /** * Create a [ComputeScheduler] for the experiment. */ -public fun createComputeScheduler(name: String, seeder: Random, placements: Map<String, String> = emptyMap()): ComputeScheduler { +public fun createComputeScheduler(name: String, seeder: RandomGenerator, placements: Map<String, String> = emptyMap()): ComputeScheduler { val cpuAllocationRatio = 16.0 val ramAllocationRatio = 1.5 return when (name) { @@ -79,7 +80,7 @@ public fun createComputeScheduler(name: String, seeder: Random, placements: Map< filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), weighers = emptyList(), subsetSize = Int.MAX_VALUE, - random = Random(seeder.nextLong()) + random = SplittableRandom(seeder.nextLong()) ) "replay" -> ReplayScheduler(placements) else -> throw IllegalArgumentException("Unknown policy $name") diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt index 38cbf2dc..d7347327 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt @@ -41,7 +41,7 @@ public class ComputeServiceProvisioningStep internal constructor( private val schedulingQuantum: Duration ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { - val service = ComputeService(ctx.coroutineContext, ctx.clock, scheduler(ctx), schedulingQuantum) + val service = ComputeService(ctx.dispatcher, scheduler(ctx), schedulingQuantum) ctx.registry.register(serviceDomain, ComputeService::class.java, service) return AutoCloseable { service.close() } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt index 2200880d..b7884293 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.compute -import java.util.Random +import java.util.random.RandomGenerator /** * An interface that describes how a workload is resolved. @@ -31,5 +31,5 @@ public interface ComputeWorkload { /** * Resolve the workload into a list of [VirtualMachine]s to simulate. */ - public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> + public fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt index 81a5cf33..eb85dbb8 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt @@ -24,8 +24,8 @@ package org.opendc.experiments.compute import org.opendc.compute.service.ComputeService import org.opendc.compute.simulator.failure.HostFaultInjector -import java.time.Clock -import java.util.Random +import java.time.InstantSource +import java.util.random.RandomGenerator import kotlin.coroutines.CoroutineContext /** @@ -37,8 +37,8 @@ public interface FailureModel { */ public fun createInjector( context: CoroutineContext, - clock: Clock, + clock: InstantSource, service: ComputeService, - random: Random + random: RandomGenerator ): HostFaultInjector } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt index ff747066..679e370a 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt @@ -31,9 +31,9 @@ import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.failure.HostFaultInjector import org.opendc.compute.simulator.failure.StartStopHostFault import org.opendc.compute.simulator.failure.StochasticVictimSelector -import java.time.Clock import java.time.Duration -import java.util.Random +import java.time.InstantSource +import java.util.random.RandomGenerator import kotlin.coroutines.CoroutineContext import kotlin.math.ln @@ -47,9 +47,9 @@ public fun grid5000(failureInterval: Duration): FailureModel { return object : FailureModel { override fun createInjector( context: CoroutineContext, - clock: Clock, + clock: InstantSource, service: ComputeService, - random: Random + random: RandomGenerator ): HostFaultInjector { val rng = Well19937c(random.nextLong()) val hosts = service.hosts.map { it as SimHost }.toSet() diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt index e224fb84..310aa54c 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt @@ -46,7 +46,7 @@ public class HostsProvisioningStep internal constructor( ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" } - val engine = FlowEngine.create(ctx.coroutineContext, ctx.clock) + val engine = FlowEngine.create(ctx.dispatcher) val graph = engine.newGraph() val hosts = mutableSetOf<SimHost>() @@ -58,7 +58,7 @@ public class HostsProvisioningStep internal constructor( spec.uid, spec.name, spec.meta, - ctx.clock, + ctx.dispatcher.timeSource, machine, hypervisor, optimize = optimize diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt index f0e31932..16d28edb 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt @@ -29,7 +29,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.yield import org.opendc.compute.service.ComputeService -import java.time.Clock +import java.time.InstantSource import java.util.Random import kotlin.coroutines.coroutineContext import kotlin.math.max @@ -45,7 +45,7 @@ import kotlin.math.max * @param interference A flag to indicate that VM interference needs to be enabled. */ public suspend fun ComputeService.replay( - clock: Clock, + clock: InstantSource, trace: List<VirtualMachine>, seed: Long, submitImmediately: Boolean = false, diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt index 3a7a51f2..ca23a7c5 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt @@ -26,7 +26,7 @@ import mu.KotlinLogging import org.opendc.experiments.compute.ComputeWorkload import org.opendc.experiments.compute.ComputeWorkloadLoader import org.opendc.experiments.compute.VirtualMachine -import java.util.Random +import java.util.random.RandomGenerator /** * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads. @@ -37,7 +37,7 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt index a6055762..583405da 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt @@ -26,8 +26,8 @@ import mu.KotlinLogging import org.opendc.experiments.compute.ComputeWorkload import org.opendc.experiments.compute.ComputeWorkloadLoader import org.opendc.experiments.compute.VirtualMachine -import java.util.Random import java.util.UUID +import java.util.random.RandomGenerator /** * A [ComputeWorkload] that samples HPC VMs in the workload. @@ -46,7 +46,7 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti */ private val pattern = Regex("^(ComputeNode|cn).*") - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { val vms = source.resolve(loader, random) val (hpc, nonHpc) = vms.partition { entry -> @@ -58,7 +58,6 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti .map { index -> val res = mutableListOf<VirtualMachine>() hpc.mapTo(res) { sample(it, index) } - res.shuffle(random) res } .flatten() @@ -67,7 +66,6 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti .map { index -> val res = mutableListOf<VirtualMachine>() nonHpc.mapTo(res) { sample(it, index) } - res.shuffle(random) res } .flatten() diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt index 793f1de9..ffb7e0c6 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt @@ -26,7 +26,7 @@ import mu.KotlinLogging import org.opendc.experiments.compute.ComputeWorkload import org.opendc.experiments.compute.ComputeWorkloadLoader import org.opendc.experiments.compute.VirtualMachine -import java.util.Random +import java.util.random.RandomGenerator /** * A [ComputeWorkload] that is sampled based on total load. @@ -37,7 +37,7 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { val vms = source.resolve(loader, random) val res = mutableListOf<VirtualMachine>() diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt index b4e9005f..d9e311cd 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt @@ -25,13 +25,13 @@ package org.opendc.experiments.compute.internal import org.opendc.experiments.compute.ComputeWorkload import org.opendc.experiments.compute.ComputeWorkloadLoader import org.opendc.experiments.compute.VirtualMachine -import java.util.Random +import java.util.random.RandomGenerator /** * A [ComputeWorkload] from a trace. */ internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { return loader.get(name, format) } } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt index ac058171..efd38a3c 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt @@ -27,6 +27,8 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import mu.KotlinLogging +import org.opendc.common.Dispatcher +import org.opendc.common.asCoroutineDispatcher import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host @@ -35,7 +37,6 @@ import org.opendc.experiments.compute.telemetry.table.HostTableReader import org.opendc.experiments.compute.telemetry.table.ServerInfo import org.opendc.experiments.compute.telemetry.table.ServerTableReader import org.opendc.experiments.compute.telemetry.table.ServiceTableReader -import java.time.Clock import java.time.Duration import java.time.Instant @@ -43,20 +44,20 @@ import java.time.Instant * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every * export interval. * - * @param scope The [CoroutineScope] to run the reader in. - * @param clock The virtual clock. + * @param dispatcher A [Dispatcher] for scheduling the future events. * @param service The [ComputeService] to monitor. * @param monitor The monitor to export the metrics to. * @param exportInterval The export interval. */ public class ComputeMetricReader( - scope: CoroutineScope, - clock: Clock, + dispatcher: Dispatcher, private val service: ComputeService, private val monitor: ComputeMonitor, private val exportInterval: Duration = Duration.ofMinutes(5) ) : AutoCloseable { private val logger = KotlinLogging.logger {} + private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher()) + private val clock = dispatcher.timeSource /** * Aggregator for service metrics. diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt index 68ca5ae8..665611dd 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt @@ -22,9 +22,6 @@ package org.opendc.experiments.compute.telemetry -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel import org.opendc.compute.service.ComputeService import org.opendc.experiments.provisioner.ProvisioningContext import org.opendc.experiments.provisioner.ProvisioningStep @@ -40,13 +37,8 @@ public class ComputeMonitorProvisioningStep internal constructor( private val exportInterval: Duration ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { - val scope = CoroutineScope(ctx.coroutineContext + Job()) val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" } - val metricReader = ComputeMetricReader(scope, ctx.clock, service, monitor, exportInterval) - - return AutoCloseable { - metricReader.close() - scope.cancel() - } + val metricReader = ComputeMetricReader(ctx.dispatcher, service, monitor, exportInterval) + return AutoCloseable { metricReader.close() } } } diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt index 3b4200c8..e5c2f86a 100644 --- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt @@ -56,10 +56,9 @@ public class FaaSServiceProvisioningStep internal constructor( } else { ZeroDelayInjector } - val deployer = SimFunctionDeployer(ctx.coroutineContext, ctx.clock, machineModel, delayInjector) + val deployer = SimFunctionDeployer(ctx.dispatcher, machineModel, delayInjector) val service = FaaSService( - ctx.coroutineContext, - ctx.clock, + ctx.dispatcher, deployer, routingPolicy(ctx), terminationPolicy(ctx) diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt index c4001e2e..7a354d69 100644 --- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt @@ -28,16 +28,16 @@ import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import org.opendc.faas.service.FaaSService -import java.time.Clock +import java.time.InstantSource import kotlin.math.max /** * Run a simulation of the [FaaSService] by replaying the workload trace given by [trace]. * - * @param clock A [Clock] instance tracking simulation time. + * @param clock An [InstantSource] instance tracking simulation time. * @param trace The trace to simulate. */ -public suspend fun FaaSService.replay(clock: Clock, trace: List<FunctionTrace>) { +public suspend fun FaaSService.replay(clock: InstantSource, trace: List<FunctionTrace>) { val client = newClient() try { coroutineScope { diff --git a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt index 1ad9c57f..4a4d9ae0 100644 --- a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt +++ b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt @@ -49,12 +49,12 @@ class FaaSExperiment { fun testSmoke() = runSimulation { val faasService = "faas.opendc.org" - Provisioner(coroutineContext, clock, seed = 0L).use { provisioner -> + Provisioner(dispatcher, seed = 0L).use { provisioner -> provisioner.runStep( setupFaaSService( faasService, { RandomRoutingPolicy() }, - { FunctionTerminationPolicyFixed(it.coroutineContext, it.clock, timeout = Duration.ofMinutes(10)) }, + { FunctionTerminationPolicyFixed(it.dispatcher, timeout = Duration.ofMinutes(10)) }, createMachineModel(), coldStartModel = ColdStartModel.GOOGLE ) @@ -63,7 +63,7 @@ class FaaSExperiment { val service = provisioner.registry.resolve(faasService, FaaSService::class.java)!! val trace = ServerlessTraceReader().parse(File("src/test/resources/trace")) - service.replay(clock, trace) + service.replay(timeSource, trace) val stats = service.getSchedulerStats() diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index eb308970..53bf5aa6 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -22,12 +22,9 @@ package org.opendc.experiments.tf20.core -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.common.Dispatcher import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.SimMachineContext @@ -36,17 +33,14 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.CpuPowerModel -import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow2.FlowEngine import org.opendc.simulator.flow2.FlowStage import org.opendc.simulator.flow2.FlowStageLogic import org.opendc.simulator.flow2.OutPort -import java.time.Clock import java.util.ArrayDeque import java.util.UUID import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume import kotlin.math.ceil import kotlin.math.roundToLong @@ -57,22 +51,16 @@ import kotlin.math.roundToLong public class SimTFDevice( override val uid: UUID, override val isGpu: Boolean, - context: CoroutineContext, - clock: Clock, + dispatcher: Dispatcher, pu: ProcessingUnit, private val memory: MemoryUnit, powerModel: CpuPowerModel ) : TFDevice { /** - * The scope in which the device runs. - */ - private val scope = CoroutineScope(context + Job()) - - /** * The [SimMachine] representing the device. */ private val machine = SimBareMetalMachine.create( - FlowEngine.create(context, clock).newGraph(), + FlowEngine.create(dispatcher).newGraph(), MachineModel(listOf(pu), listOf(memory)), SimPsuFactories.simple(powerModel) ) @@ -162,9 +150,7 @@ public class SimTFDevice( } init { - scope.launch { - machine.runWorkload(workload) - } + machine.startWorkload(workload, emptyMap()) {} } override suspend fun load(dataSize: Long) { @@ -185,7 +171,6 @@ public class SimTFDevice( override fun close() { machine.cancel() - scope.cancel() } private data class Work(var flops: Double, val cont: Continuation<Unit>) { diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt index 7d65a674..5b408fb3 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt @@ -23,19 +23,18 @@ package org.opendc.experiments.tf20.network import kotlinx.coroutines.channels.Channel +import org.opendc.common.Dispatcher import org.opendc.common.util.TimerScheduler -import java.time.Clock -import kotlin.coroutines.CoroutineContext /** * The network controller represents a simple network model between the worker and master nodes during * TensorFlow execution. */ -public class NetworkController(context: CoroutineContext, clock: Clock) : AutoCloseable { +public class NetworkController(dispatcher: Dispatcher) : AutoCloseable { /** * The scheduler for the message. */ - private val scheduler = TimerScheduler<Message>(context, clock) + private val scheduler = TimerScheduler<Message>(dispatcher) /** * The outbound communication channels. diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt index 32f72686..899aafc0 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt @@ -48,8 +48,7 @@ class TensorFlowTest { val device = SimTFDevice( def.uid, def.meta["gpu"] as Boolean, - coroutineContext, - clock, + dispatcher, def.model.cpus[0], def.model.memory[0], CpuPowerModels.linear(250.0, 60.0) @@ -67,7 +66,7 @@ class TensorFlowTest { val stats = device.getDeviceStats() assertAll( - { assertEquals(3309694252, clock.millis()) }, + { assertEquals(3309694252, timeSource.millis()) }, { assertEquals(8.27423563E8, stats.energyUsage) } ) } @@ -83,8 +82,7 @@ class TensorFlowTest { val device = SimTFDevice( def.uid, def.meta["gpu"] as Boolean, - coroutineContext, - clock, + dispatcher, def.model.cpus[0], def.model.memory[0], CpuPowerModels.linear(250.0, 60.0) @@ -102,7 +100,7 @@ class TensorFlowTest { val stats = device.getDeviceStats() assertAll( - { assertEquals(176230328513, clock.millis()) }, + { assertEquals(176230328513, timeSource.millis()) }, { assertEquals(4.405758212825E10, stats.energyUsage) } ) } @@ -118,8 +116,7 @@ class TensorFlowTest { val deviceA = SimTFDevice( def.uid, def.meta["gpu"] as Boolean, - coroutineContext, - clock, + dispatcher, def.model.cpus[0], def.model.memory[0], CpuPowerModels.linear(250.0, 60.0) @@ -128,8 +125,7 @@ class TensorFlowTest { val deviceB = SimTFDevice( UUID.randomUUID(), def.meta["gpu"] as Boolean, - coroutineContext, - clock, + dispatcher, def.model.cpus[0], def.model.memory[0], CpuPowerModels.linear(250.0, 60.0) @@ -150,7 +146,7 @@ class TensorFlowTest { val statsA = deviceA.getDeviceStats() val statsB = deviceB.getDeviceStats() assertAll( - { assertEquals(1704994000, clock.millis()) }, + { assertEquals(1704994000, timeSource.millis()) }, { assertEquals(4.262485E8, statsA.energyUsage) }, { assertEquals(4.262485E8, statsB.energyUsage) } ) diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt index 910cbcc9..549c6f3e 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt @@ -47,8 +47,7 @@ internal class SimTFDeviceTest { val device = SimTFDevice( UUID.randomUUID(), isGpu = true, - coroutineContext, - clock, + dispatcher, pu, memory, CpuPowerModels.linear(250.0, 100.0) @@ -56,7 +55,7 @@ internal class SimTFDeviceTest { // Load 1 GiB into GPU memory device.load(1000) - assertEquals(1140, clock.millis()) + assertEquals(1140, timeSource.millis()) coroutineScope { launch { device.compute(1e6) } @@ -68,7 +67,7 @@ internal class SimTFDeviceTest { val stats = device.getDeviceStats() assertAll( - { assertEquals(3681, clock.millis()) }, + { assertEquals(3681, timeSource.millis()) }, { assertEquals(749.25, stats.energyUsage) } ) } diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt index b622362a..2037dad4 100644 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt @@ -42,7 +42,7 @@ import org.opendc.workflow.api.Task import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE import org.opendc.workflow.service.WorkflowService -import java.time.Clock +import java.time.InstantSource import java.util.UUID import kotlin.collections.HashMap import kotlin.collections.HashSet @@ -110,7 +110,7 @@ public fun Trace.toJobs(): List<Job> { /** * Helper method to replay the specified list of [jobs] and suspend execution util all jobs have finished. */ -public suspend fun WorkflowService.replay(clock: Clock, jobs: List<Job>) { +public suspend fun WorkflowService.replay(clock: InstantSource, jobs: List<Job>) { // Sort jobs by their arrival time val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long } if (orderedJobs.isEmpty()) { diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt index 5cee9abf..fe4fde17 100644 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt @@ -47,8 +47,7 @@ public class WorkflowServiceProvisioningStep internal constructor( val client = computeService.newClient() val service = WorkflowService( - ctx.coroutineContext, - ctx.clock, + ctx.dispatcher, client, scheduler.schedulingQuantum, jobAdmissionPolicy = scheduler.jobAdmissionPolicy, |
