diff options
Diffstat (limited to 'opendc-experiments')
28 files changed, 91 insertions, 129 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt index 7fe3a2eb..eae5806e 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt @@ -22,13 +22,12 @@ package org.opendc.experiments.provisioner +import org.opendc.common.Dispatcher import org.opendc.experiments.MutableServiceRegistry import org.opendc.experiments.ServiceRegistry import org.opendc.experiments.internal.ServiceRegistryImpl -import java.time.Clock import java.util.ArrayDeque import java.util.SplittableRandom -import kotlin.coroutines.CoroutineContext /** * A helper class to set up the experimental environment in a reproducible manner. @@ -37,17 +36,15 @@ import kotlin.coroutines.CoroutineContext * [ProvisioningStep]s are executed sequentially and ensure that the necessary infrastructure is configured and teared * down after the simulation completes. * - * @param coroutineContext The [CoroutineContext] in which the environment is set up. - * @param clock The simulation [Clock]. + * @param dispatcher The [Dispatcher] implementation for scheduling future tasks. * @param seed A seed for initializing the randomness of the environment. */ -public class Provisioner(coroutineContext: CoroutineContext, clock: Clock, seed: Long) : AutoCloseable { +public class Provisioner(dispatcher: Dispatcher, seed: Long) : AutoCloseable { /** * Implementation of [ProvisioningContext]. */ private val context = object : ProvisioningContext { - override val clock: Clock = clock - override val coroutineContext: CoroutineContext = coroutineContext + override val dispatcher: Dispatcher = dispatcher override val seeder: SplittableRandom = SplittableRandom(seed) override val registry: MutableServiceRegistry = ServiceRegistryImpl() diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt index 73897315..e53044ce 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt @@ -22,31 +22,26 @@ package org.opendc.experiments.provisioner +import org.opendc.common.Dispatcher import org.opendc.experiments.MutableServiceRegistry -import java.time.Clock import java.util.SplittableRandom -import kotlin.coroutines.CoroutineContext +import java.util.random.RandomGenerator /** * The [ProvisioningContext] class provides access to shared state between subsequent [ProvisioningStep]s, as well as - * access to the simulation dispatcher (via [CoroutineContext]), the virtual clock, and a randomness seeder to allow + * access to the simulation dispatcher, the virtual clock, and a randomness seeder to allow * the provisioning steps to initialize the (simulated) resources. */ public interface ProvisioningContext { /** - * The [CoroutineContext] in which the provisioner runs. + * The [Dispatcher] provided by the provisioner to schedule future events during the simulation. */ - public val coroutineContext: CoroutineContext - - /** - * The [Clock] tracking the virtual simulation time. - */ - public val clock: Clock + public val dispatcher: Dispatcher /** * A [SplittableRandom] instance used to seed the provisioners. */ - public val seeder: SplittableRandom + public val seeder: RandomGenerator /** * A [MutableServiceRegistry] where the provisioned services are registered. diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index 3e3d758d..1221f084 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -75,7 +75,7 @@ class CapelinBenchmarks { fun benchmarkCapelin() = runSimulation { val serviceDomain = "compute.opendc.org" - Provisioner(coroutineContext, clock, seed = 0).use { provisioner -> + Provisioner(dispatcher, seed = 0).use { provisioner -> val computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) @@ -87,7 +87,7 @@ class CapelinBenchmarks { ) val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! - service.replay(clock, vms, 0L, interference = true) + service.replay(timeSource, vms, 0L, interference = true) } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt index 2c3573dc..2567a4d5 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt @@ -64,7 +64,7 @@ public class CapelinRunner( val serviceDomain = "compute.opendc.org" val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt")) - Provisioner(coroutineContext, clock, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain, { createComputeScheduler(scenario.allocationPolicy, Random(it.seeder.nextLong())) }), setupHosts(serviceDomain, topology, optimize = true) @@ -96,7 +96,7 @@ public class CapelinRunner( null } - service.replay(clock, vms, seed, failureModel = failureModel, interference = operationalPhenomena.hasInterference) + service.replay(timeSource, vms, seed, failureModel = failureModel, interference = operationalPhenomena.hasInterference) } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt index 0b4cafa6..3a2acbd7 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt @@ -34,8 +34,9 @@ import org.opendc.simulator.compute.power.CpuPowerModel import org.opendc.simulator.compute.power.CpuPowerModels import java.io.File import java.io.InputStream -import java.util.Random +import java.util.SplittableRandom import java.util.UUID +import java.util.random.RandomGenerator import kotlin.math.roundToLong /** @@ -49,7 +50,7 @@ private val reader = ClusterSpecReader() fun clusterTopology( file: File, powerModel: CpuPowerModel = CpuPowerModels.linear(350.0, 200.0), - random: Random = Random(0) + random: RandomGenerator = SplittableRandom(0) ): List<HostSpec> { return clusterTopology(reader.read(file), powerModel, random) } @@ -60,7 +61,7 @@ fun clusterTopology( fun clusterTopology( input: InputStream, powerModel: CpuPowerModel = CpuPowerModels.linear(350.0, 200.0), - random: Random = Random(0) + random: RandomGenerator = SplittableRandom(0) ): List<HostSpec> { return clusterTopology(reader.read(input), powerModel, random) } @@ -68,14 +69,14 @@ fun clusterTopology( /** * Construct a topology from the given list of [clusters]. */ -fun clusterTopology(clusters: List<ClusterSpec>, powerModel: CpuPowerModel, random: Random = Random(0)): List<HostSpec> { +fun clusterTopology(clusters: List<ClusterSpec>, powerModel: CpuPowerModel, random: RandomGenerator = SplittableRandom(0)): List<HostSpec> { return clusters.flatMap { it.toHostSpecs(random, powerModel) } } /** * Helper method to convert a [ClusterSpec] into a list of [HostSpec]s. */ -private fun ClusterSpec.toHostSpecs(random: Random, powerModel: CpuPowerModel): List<HostSpec> { +private fun ClusterSpec.toHostSpecs(random: RandomGenerator, powerModel: CpuPowerModel): List<HostSpec> { val cpuSpeed = cpuSpeed val memoryPerHost = memCapacityPerHost.roundToLong() diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 77b0d09f..7e01bb64 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -94,7 +94,7 @@ class CapelinIntegrationTest { val topology = createTopology() val monitor = monitor - Provisioner(coroutineContext, clock, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), @@ -102,7 +102,7 @@ class CapelinIntegrationTest { ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(clock, workload, seed) + service.replay(timeSource, workload, seed) } println( @@ -138,7 +138,7 @@ class CapelinIntegrationTest { val topology = createTopology("single") val monitor = monitor - Provisioner(coroutineContext, clock, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), @@ -146,7 +146,7 @@ class CapelinIntegrationTest { ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(clock, workload, seed) + service.replay(timeSource, workload, seed) } println( @@ -177,7 +177,7 @@ class CapelinIntegrationTest { val workload = createTestWorkload(1.0, seed) val topology = createTopology("single") - Provisioner(coroutineContext, clock, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), @@ -185,7 +185,7 @@ class CapelinIntegrationTest { ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(clock, workload, seed, interference = true) + service.replay(timeSource, workload, seed, interference = true) } println( @@ -216,7 +216,7 @@ class CapelinIntegrationTest { val workload = createTestWorkload(0.25, seed) val monitor = monitor - Provisioner(coroutineContext, clock, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), @@ -224,7 +224,7 @@ class CapelinIntegrationTest { ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(clock, workload, seed, failureModel = grid5000(Duration.ofDays(7))) + service.replay(timeSource, workload, seed, failureModel = grid5000(Duration.ofDays(7))) } // Note that these values have been verified beforehand diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt index bbc70489..125ba6ef 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt @@ -34,12 +34,13 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher import org.opendc.compute.service.scheduler.weights.RamWeigher import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import java.util.Random +import java.util.SplittableRandom +import java.util.random.RandomGenerator /** * Create a [ComputeScheduler] for the experiment. */ -public fun createComputeScheduler(name: String, seeder: Random, placements: Map<String, String> = emptyMap()): ComputeScheduler { +public fun createComputeScheduler(name: String, seeder: RandomGenerator, placements: Map<String, String> = emptyMap()): ComputeScheduler { val cpuAllocationRatio = 16.0 val ramAllocationRatio = 1.5 return when (name) { @@ -79,7 +80,7 @@ public fun createComputeScheduler(name: String, seeder: Random, placements: Map< filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), weighers = emptyList(), subsetSize = Int.MAX_VALUE, - random = Random(seeder.nextLong()) + random = SplittableRandom(seeder.nextLong()) ) "replay" -> ReplayScheduler(placements) else -> throw IllegalArgumentException("Unknown policy $name") diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt index 38cbf2dc..d7347327 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt @@ -41,7 +41,7 @@ public class ComputeServiceProvisioningStep internal constructor( private val schedulingQuantum: Duration ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { - val service = ComputeService(ctx.coroutineContext, ctx.clock, scheduler(ctx), schedulingQuantum) + val service = ComputeService(ctx.dispatcher, scheduler(ctx), schedulingQuantum) ctx.registry.register(serviceDomain, ComputeService::class.java, service) return AutoCloseable { service.close() } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt index 2200880d..b7884293 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.compute -import java.util.Random +import java.util.random.RandomGenerator /** * An interface that describes how a workload is resolved. @@ -31,5 +31,5 @@ public interface ComputeWorkload { /** * Resolve the workload into a list of [VirtualMachine]s to simulate. */ - public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> + public fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt index 81a5cf33..eb85dbb8 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt @@ -24,8 +24,8 @@ package org.opendc.experiments.compute import org.opendc.compute.service.ComputeService import org.opendc.compute.simulator.failure.HostFaultInjector -import java.time.Clock -import java.util.Random +import java.time.InstantSource +import java.util.random.RandomGenerator import kotlin.coroutines.CoroutineContext /** @@ -37,8 +37,8 @@ public interface FailureModel { */ public fun createInjector( context: CoroutineContext, - clock: Clock, + clock: InstantSource, service: ComputeService, - random: Random + random: RandomGenerator ): HostFaultInjector } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt index ff747066..679e370a 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt @@ -31,9 +31,9 @@ import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.failure.HostFaultInjector import org.opendc.compute.simulator.failure.StartStopHostFault import org.opendc.compute.simulator.failure.StochasticVictimSelector -import java.time.Clock import java.time.Duration -import java.util.Random +import java.time.InstantSource +import java.util.random.RandomGenerator import kotlin.coroutines.CoroutineContext import kotlin.math.ln @@ -47,9 +47,9 @@ public fun grid5000(failureInterval: Duration): FailureModel { return object : FailureModel { override fun createInjector( context: CoroutineContext, - clock: Clock, + clock: InstantSource, service: ComputeService, - random: Random + random: RandomGenerator ): HostFaultInjector { val rng = Well19937c(random.nextLong()) val hosts = service.hosts.map { it as SimHost }.toSet() diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt index e224fb84..310aa54c 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt @@ -46,7 +46,7 @@ public class HostsProvisioningStep internal constructor( ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" } - val engine = FlowEngine.create(ctx.coroutineContext, ctx.clock) + val engine = FlowEngine.create(ctx.dispatcher) val graph = engine.newGraph() val hosts = mutableSetOf<SimHost>() @@ -58,7 +58,7 @@ public class HostsProvisioningStep internal constructor( spec.uid, spec.name, spec.meta, - ctx.clock, + ctx.dispatcher.timeSource, machine, hypervisor, optimize = optimize diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt index f0e31932..16d28edb 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt @@ -29,7 +29,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.yield import org.opendc.compute.service.ComputeService -import java.time.Clock +import java.time.InstantSource import java.util.Random import kotlin.coroutines.coroutineContext import kotlin.math.max @@ -45,7 +45,7 @@ import kotlin.math.max * @param interference A flag to indicate that VM interference needs to be enabled. */ public suspend fun ComputeService.replay( - clock: Clock, + clock: InstantSource, trace: List<VirtualMachine>, seed: Long, submitImmediately: Boolean = false, diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt index 3a7a51f2..ca23a7c5 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt @@ -26,7 +26,7 @@ import mu.KotlinLogging import org.opendc.experiments.compute.ComputeWorkload import org.opendc.experiments.compute.ComputeWorkloadLoader import org.opendc.experiments.compute.VirtualMachine -import java.util.Random +import java.util.random.RandomGenerator /** * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads. @@ -37,7 +37,7 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt index a6055762..583405da 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt @@ -26,8 +26,8 @@ import mu.KotlinLogging import org.opendc.experiments.compute.ComputeWorkload import org.opendc.experiments.compute.ComputeWorkloadLoader import org.opendc.experiments.compute.VirtualMachine -import java.util.Random import java.util.UUID +import java.util.random.RandomGenerator /** * A [ComputeWorkload] that samples HPC VMs in the workload. @@ -46,7 +46,7 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti */ private val pattern = Regex("^(ComputeNode|cn).*") - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { val vms = source.resolve(loader, random) val (hpc, nonHpc) = vms.partition { entry -> @@ -58,7 +58,6 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti .map { index -> val res = mutableListOf<VirtualMachine>() hpc.mapTo(res) { sample(it, index) } - res.shuffle(random) res } .flatten() @@ -67,7 +66,6 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti .map { index -> val res = mutableListOf<VirtualMachine>() nonHpc.mapTo(res) { sample(it, index) } - res.shuffle(random) res } .flatten() diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt index 793f1de9..ffb7e0c6 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt @@ -26,7 +26,7 @@ import mu.KotlinLogging import org.opendc.experiments.compute.ComputeWorkload import org.opendc.experiments.compute.ComputeWorkloadLoader import org.opendc.experiments.compute.VirtualMachine -import java.util.Random +import java.util.random.RandomGenerator /** * A [ComputeWorkload] that is sampled based on total load. @@ -37,7 +37,7 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { val vms = source.resolve(loader, random) val res = mutableListOf<VirtualMachine>() diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt index b4e9005f..d9e311cd 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt @@ -25,13 +25,13 @@ package org.opendc.experiments.compute.internal import org.opendc.experiments.compute.ComputeWorkload import org.opendc.experiments.compute.ComputeWorkloadLoader import org.opendc.experiments.compute.VirtualMachine -import java.util.Random +import java.util.random.RandomGenerator /** * A [ComputeWorkload] from a trace. */ internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { - override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { return loader.get(name, format) } } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt index ac058171..efd38a3c 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt @@ -27,6 +27,8 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import mu.KotlinLogging +import org.opendc.common.Dispatcher +import org.opendc.common.asCoroutineDispatcher import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host @@ -35,7 +37,6 @@ import org.opendc.experiments.compute.telemetry.table.HostTableReader import org.opendc.experiments.compute.telemetry.table.ServerInfo import org.opendc.experiments.compute.telemetry.table.ServerTableReader import org.opendc.experiments.compute.telemetry.table.ServiceTableReader -import java.time.Clock import java.time.Duration import java.time.Instant @@ -43,20 +44,20 @@ import java.time.Instant * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every * export interval. * - * @param scope The [CoroutineScope] to run the reader in. - * @param clock The virtual clock. + * @param dispatcher A [Dispatcher] for scheduling the future events. * @param service The [ComputeService] to monitor. * @param monitor The monitor to export the metrics to. * @param exportInterval The export interval. */ public class ComputeMetricReader( - scope: CoroutineScope, - clock: Clock, + dispatcher: Dispatcher, private val service: ComputeService, private val monitor: ComputeMonitor, private val exportInterval: Duration = Duration.ofMinutes(5) ) : AutoCloseable { private val logger = KotlinLogging.logger {} + private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher()) + private val clock = dispatcher.timeSource /** * Aggregator for service metrics. diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt index 68ca5ae8..665611dd 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt @@ -22,9 +22,6 @@ package org.opendc.experiments.compute.telemetry -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel import org.opendc.compute.service.ComputeService import org.opendc.experiments.provisioner.ProvisioningContext import org.opendc.experiments.provisioner.ProvisioningStep @@ -40,13 +37,8 @@ public class ComputeMonitorProvisioningStep internal constructor( private val exportInterval: Duration ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { - val scope = CoroutineScope(ctx.coroutineContext + Job()) val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" } - val metricReader = ComputeMetricReader(scope, ctx.clock, service, monitor, exportInterval) - - return AutoCloseable { - metricReader.close() - scope.cancel() - } + val metricReader = ComputeMetricReader(ctx.dispatcher, service, monitor, exportInterval) + return AutoCloseable { metricReader.close() } } } diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt index 3b4200c8..e5c2f86a 100644 --- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt @@ -56,10 +56,9 @@ public class FaaSServiceProvisioningStep internal constructor( } else { ZeroDelayInjector } - val deployer = SimFunctionDeployer(ctx.coroutineContext, ctx.clock, machineModel, delayInjector) + val deployer = SimFunctionDeployer(ctx.dispatcher, machineModel, delayInjector) val service = FaaSService( - ctx.coroutineContext, - ctx.clock, + ctx.dispatcher, deployer, routingPolicy(ctx), terminationPolicy(ctx) diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt index c4001e2e..7a354d69 100644 --- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt @@ -28,16 +28,16 @@ import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import org.opendc.faas.service.FaaSService -import java.time.Clock +import java.time.InstantSource import kotlin.math.max /** * Run a simulation of the [FaaSService] by replaying the workload trace given by [trace]. * - * @param clock A [Clock] instance tracking simulation time. + * @param clock An [InstantSource] instance tracking simulation time. * @param trace The trace to simulate. */ -public suspend fun FaaSService.replay(clock: Clock, trace: List<FunctionTrace>) { +public suspend fun FaaSService.replay(clock: InstantSource, trace: List<FunctionTrace>) { val client = newClient() try { coroutineScope { diff --git a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt index 1ad9c57f..4a4d9ae0 100644 --- a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt +++ b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt @@ -49,12 +49,12 @@ class FaaSExperiment { fun testSmoke() = runSimulation { val faasService = "faas.opendc.org" - Provisioner(coroutineContext, clock, seed = 0L).use { provisioner -> + Provisioner(dispatcher, seed = 0L).use { provisioner -> provisioner.runStep( setupFaaSService( faasService, { RandomRoutingPolicy() }, - { FunctionTerminationPolicyFixed(it.coroutineContext, it.clock, timeout = Duration.ofMinutes(10)) }, + { FunctionTerminationPolicyFixed(it.dispatcher, timeout = Duration.ofMinutes(10)) }, createMachineModel(), coldStartModel = ColdStartModel.GOOGLE ) @@ -63,7 +63,7 @@ class FaaSExperiment { val service = provisioner.registry.resolve(faasService, FaaSService::class.java)!! val trace = ServerlessTraceReader().parse(File("src/test/resources/trace")) - service.replay(clock, trace) + service.replay(timeSource, trace) val stats = service.getSchedulerStats() diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index eb308970..53bf5aa6 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -22,12 +22,9 @@ package org.opendc.experiments.tf20.core -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.common.Dispatcher import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.SimMachineContext @@ -36,17 +33,14 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.CpuPowerModel -import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow2.FlowEngine import org.opendc.simulator.flow2.FlowStage import org.opendc.simulator.flow2.FlowStageLogic import org.opendc.simulator.flow2.OutPort -import java.time.Clock import java.util.ArrayDeque import java.util.UUID import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume import kotlin.math.ceil import kotlin.math.roundToLong @@ -57,22 +51,16 @@ import kotlin.math.roundToLong public class SimTFDevice( override val uid: UUID, override val isGpu: Boolean, - context: CoroutineContext, - clock: Clock, + dispatcher: Dispatcher, pu: ProcessingUnit, private val memory: MemoryUnit, powerModel: CpuPowerModel ) : TFDevice { /** - * The scope in which the device runs. - */ - private val scope = CoroutineScope(context + Job()) - - /** * The [SimMachine] representing the device. */ private val machine = SimBareMetalMachine.create( - FlowEngine.create(context, clock).newGraph(), + FlowEngine.create(dispatcher).newGraph(), MachineModel(listOf(pu), listOf(memory)), SimPsuFactories.simple(powerModel) ) @@ -162,9 +150,7 @@ public class SimTFDevice( } init { - scope.launch { - machine.runWorkload(workload) - } + machine.startWorkload(workload, emptyMap()) {} } override suspend fun load(dataSize: Long) { @@ -185,7 +171,6 @@ public class SimTFDevice( override fun close() { machine.cancel() - scope.cancel() } private data class Work(var flops: Double, val cont: Continuation<Unit>) { diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt index 7d65a674..5b408fb3 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt @@ -23,19 +23,18 @@ package org.opendc.experiments.tf20.network import kotlinx.coroutines.channels.Channel +import org.opendc.common.Dispatcher import org.opendc.common.util.TimerScheduler -import java.time.Clock -import kotlin.coroutines.CoroutineContext /** * The network controller represents a simple network model between the worker and master nodes during * TensorFlow execution. */ -public class NetworkController(context: CoroutineContext, clock: Clock) : AutoCloseable { +public class NetworkController(dispatcher: Dispatcher) : AutoCloseable { /** * The scheduler for the message. */ - private val scheduler = TimerScheduler<Message>(context, clock) + private val scheduler = TimerScheduler<Message>(dispatcher) /** * The outbound communication channels. diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt index 32f72686..899aafc0 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt @@ -48,8 +48,7 @@ class TensorFlowTest { val device = SimTFDevice( def.uid, def.meta["gpu"] as Boolean, - coroutineContext, - clock, + dispatcher, def.model.cpus[0], def.model.memory[0], CpuPowerModels.linear(250.0, 60.0) @@ -67,7 +66,7 @@ class TensorFlowTest { val stats = device.getDeviceStats() assertAll( - { assertEquals(3309694252, clock.millis()) }, + { assertEquals(3309694252, timeSource.millis()) }, { assertEquals(8.27423563E8, stats.energyUsage) } ) } @@ -83,8 +82,7 @@ class TensorFlowTest { val device = SimTFDevice( def.uid, def.meta["gpu"] as Boolean, - coroutineContext, - clock, + dispatcher, def.model.cpus[0], def.model.memory[0], CpuPowerModels.linear(250.0, 60.0) @@ -102,7 +100,7 @@ class TensorFlowTest { val stats = device.getDeviceStats() assertAll( - { assertEquals(176230328513, clock.millis()) }, + { assertEquals(176230328513, timeSource.millis()) }, { assertEquals(4.405758212825E10, stats.energyUsage) } ) } @@ -118,8 +116,7 @@ class TensorFlowTest { val deviceA = SimTFDevice( def.uid, def.meta["gpu"] as Boolean, - coroutineContext, - clock, + dispatcher, def.model.cpus[0], def.model.memory[0], CpuPowerModels.linear(250.0, 60.0) @@ -128,8 +125,7 @@ class TensorFlowTest { val deviceB = SimTFDevice( UUID.randomUUID(), def.meta["gpu"] as Boolean, - coroutineContext, - clock, + dispatcher, def.model.cpus[0], def.model.memory[0], CpuPowerModels.linear(250.0, 60.0) @@ -150,7 +146,7 @@ class TensorFlowTest { val statsA = deviceA.getDeviceStats() val statsB = deviceB.getDeviceStats() assertAll( - { assertEquals(1704994000, clock.millis()) }, + { assertEquals(1704994000, timeSource.millis()) }, { assertEquals(4.262485E8, statsA.energyUsage) }, { assertEquals(4.262485E8, statsB.energyUsage) } ) diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt index 910cbcc9..549c6f3e 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt @@ -47,8 +47,7 @@ internal class SimTFDeviceTest { val device = SimTFDevice( UUID.randomUUID(), isGpu = true, - coroutineContext, - clock, + dispatcher, pu, memory, CpuPowerModels.linear(250.0, 100.0) @@ -56,7 +55,7 @@ internal class SimTFDeviceTest { // Load 1 GiB into GPU memory device.load(1000) - assertEquals(1140, clock.millis()) + assertEquals(1140, timeSource.millis()) coroutineScope { launch { device.compute(1e6) } @@ -68,7 +67,7 @@ internal class SimTFDeviceTest { val stats = device.getDeviceStats() assertAll( - { assertEquals(3681, clock.millis()) }, + { assertEquals(3681, timeSource.millis()) }, { assertEquals(749.25, stats.energyUsage) } ) } diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt index b622362a..2037dad4 100644 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt @@ -42,7 +42,7 @@ import org.opendc.workflow.api.Task import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE import org.opendc.workflow.service.WorkflowService -import java.time.Clock +import java.time.InstantSource import java.util.UUID import kotlin.collections.HashMap import kotlin.collections.HashSet @@ -110,7 +110,7 @@ public fun Trace.toJobs(): List<Job> { /** * Helper method to replay the specified list of [jobs] and suspend execution util all jobs have finished. */ -public suspend fun WorkflowService.replay(clock: Clock, jobs: List<Job>) { +public suspend fun WorkflowService.replay(clock: InstantSource, jobs: List<Job>) { // Sort jobs by their arrival time val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long } if (orderedJobs.isEmpty()) { diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt index 5cee9abf..fe4fde17 100644 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt @@ -47,8 +47,7 @@ public class WorkflowServiceProvisioningStep internal constructor( val client = computeService.newClient() val service = WorkflowService( - ctx.coroutineContext, - ctx.clock, + ctx.dispatcher, client, scheduler.schedulingQuantum, jobAdmissionPolicy = scheduler.jobAdmissionPolicy, |
