diff options
Diffstat (limited to 'opendc-compute')
13 files changed, 64 insertions, 48 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 85222c10..9d7dcba6 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -22,15 +22,14 @@ package org.opendc.compute.service +import org.opendc.common.Dispatcher import org.opendc.compute.api.ComputeClient import org.opendc.compute.api.Server import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.service.telemetry.SchedulerStats -import java.time.Clock import java.time.Duration -import kotlin.coroutines.CoroutineContext /** * The [ComputeService] hosts the API implementation of the OpenDC Compute service. @@ -80,18 +79,16 @@ public interface ComputeService : AutoCloseable { /** * Construct a new [ComputeService] implementation. * - * @param context The [CoroutineContext] to use in the service. - * @param clock The clock instance to use. + * @param dispatcher The [Dispatcher] for scheduling future events. * @param scheduler The scheduler implementation to use. * @param schedulingQuantum The interval between scheduling cycles. */ public operator fun invoke( - context: CoroutineContext, - clock: Clock, + dispatcher: Dispatcher, scheduler: ComputeScheduler, schedulingQuantum: Duration = Duration.ofMinutes(5) ): ComputeService { - return ComputeServiceImpl(context, clock, scheduler, schedulingQuantum) + return ComputeServiceImpl(dispatcher, scheduler, schedulingQuantum) } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index b377c3e3..77932545 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -23,6 +23,7 @@ package org.opendc.compute.service.internal import mu.KotlinLogging +import org.opendc.common.Dispatcher import org.opendc.common.util.Pacer import org.opendc.compute.api.ComputeClient import org.opendc.compute.api.Flavor @@ -35,27 +36,23 @@ import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.service.telemetry.SchedulerStats -import java.time.Clock import java.time.Duration import java.time.Instant import java.util.ArrayDeque import java.util.Deque import java.util.Random import java.util.UUID -import kotlin.coroutines.CoroutineContext import kotlin.math.max /** * Internal implementation of the OpenDC Compute service. * - * @param coroutineContext The [CoroutineContext] to use in the service. - * @param clock The clock instance to use. + * @param dispatcher The [Dispatcher] for scheduling future events. * @param scheduler The scheduler implementation to use. * @param schedulingQuantum The interval between scheduling cycles. */ internal class ComputeServiceImpl( - coroutineContext: CoroutineContext, - private val clock: Clock, + private val dispatcher: Dispatcher, private val scheduler: ComputeScheduler, schedulingQuantum: Duration ) : ComputeService, HostListener { @@ -108,6 +105,7 @@ internal class ComputeServiceImpl( override val hosts: Set<Host> get() = hostToView.keys + private val clock = dispatcher.timeSource private var maxCores = 0 private var maxMemory = 0L private var _attemptsSuccess = 0L @@ -120,7 +118,7 @@ internal class ComputeServiceImpl( /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } + private val pacer = Pacer(dispatcher, schedulingQuantum.toMillis()) { doSchedule() } override fun newClient(): ComputeClient { check(!isClosed) { "Service is already closed" } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt index 233f5ef6..0840ba7e 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt @@ -26,7 +26,8 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.internal.HostView import org.opendc.compute.service.scheduler.filters.HostFilter import org.opendc.compute.service.scheduler.weights.HostWeigher -import java.util.Random +import java.util.SplittableRandom +import java.util.random.RandomGenerator import kotlin.math.min /** @@ -39,13 +40,13 @@ import kotlin.math.min * @param filters The list of filters to apply when searching for an appropriate host. * @param weighers The list of weighers to apply when searching for an appropriate host. * @param subsetSize The size of the subset of best hosts from which a target is randomly chosen. - * @param random A [Random] instance for selecting + * @param random A [RandomGenerator] instance for selecting */ public class FilterScheduler( private val filters: List<HostFilter>, private val weighers: List<HostWeigher>, private val subsetSize: Int = 1, - private val random: Random = Random(0) + private val random: RandomGenerator = SplittableRandom(0) ) : ComputeScheduler { /** * The pool of hosts available to the scheduler. diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index c18709f3..b5685aba 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -62,12 +62,11 @@ internal class ComputeServiceTest { @BeforeEach fun setUp() { scope = SimulationCoroutineScope() - val clock = scope.clock val computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)), weighers = listOf(RamWeigher()) ) - service = ComputeService(scope.coroutineContext, clock, computeScheduler) + service = ComputeService(scope.dispatcher, computeScheduler) } @Test diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 5eccc6ec..a44ccc27 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -45,9 +45,9 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.compute.workload.SimWorkloads -import java.time.Clock import java.time.Duration import java.time.Instant +import java.time.InstantSource import java.util.UUID import java.util.function.Supplier @@ -68,7 +68,7 @@ public class SimHost( override val uid: UUID, override val name: String, override val meta: Map<String, Any>, - private val clock: Clock, + private val clock: InstantSource, private val machine: SimBareMetalMachine, private val hypervisor: SimHypervisor, private val mapper: SimWorkloadMapper = DefaultWorkloadMapper, diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt index 258ccc89..d34f70d7 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt @@ -23,7 +23,7 @@ package org.opendc.compute.simulator.failure import org.opendc.compute.simulator.SimHost -import java.time.Clock +import java.time.InstantSource /** * Interface responsible for applying the fault to a host. @@ -32,5 +32,5 @@ public interface HostFault { /** * Apply the fault to the specified [victims]. */ - public suspend fun apply(clock: Clock, victims: List<SimHost>) + public suspend fun apply(clock: InstantSource, victims: List<SimHost>) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt index 5eff439f..afbb99d2 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt @@ -26,6 +26,7 @@ import org.apache.commons.math3.distribution.RealDistribution import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.internal.HostFaultInjectorImpl import java.time.Clock +import java.time.InstantSource import kotlin.coroutines.CoroutineContext /** @@ -55,7 +56,7 @@ public interface HostFaultInjector : AutoCloseable { */ public operator fun invoke( context: CoroutineContext, - clock: Clock, + clock: InstantSource, hosts: Set<SimHost>, iat: RealDistribution, selector: VictimSelector, diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt index fc7cebfc..8bd25391 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt @@ -25,14 +25,14 @@ package org.opendc.compute.simulator.failure import kotlinx.coroutines.delay import org.apache.commons.math3.distribution.RealDistribution import org.opendc.compute.simulator.SimHost -import java.time.Clock +import java.time.InstantSource import kotlin.math.roundToLong /** * A type of [HostFault] where the hosts are stopped and recover after some random amount of time. */ public class StartStopHostFault(private val duration: RealDistribution) : HostFault { - override suspend fun apply(clock: Clock, victims: List<SimHost>) { + override suspend fun apply(clock: InstantSource, victims: List<SimHost>) { for (host in victims) { host.fail() } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt index b6d466bd..4aba0e91 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt @@ -24,7 +24,9 @@ package org.opendc.compute.simulator.failure import org.apache.commons.math3.distribution.RealDistribution import org.opendc.compute.simulator.SimHost -import java.util.Random +import java.util.ArrayList +import java.util.SplittableRandom +import java.util.random.RandomGenerator import kotlin.math.roundToInt /** @@ -32,12 +34,30 @@ import kotlin.math.roundToInt */ public class StochasticVictimSelector( private val size: RealDistribution, - private val random: Random = Random(0) + private val random: RandomGenerator = SplittableRandom(0) ) : VictimSelector { override fun select(hosts: Set<SimHost>): List<SimHost> { val n = size.sample().roundToInt() - return hosts.shuffled(random).take(n) + val result = ArrayList<SimHost>(n) + + val random = random + var samplesNeeded = n + var remainingHosts = hosts.size + val iterator = hosts.iterator() + + while (iterator.hasNext() && samplesNeeded > 0) { + val host = iterator.next() + + if (random.nextInt(remainingHosts) < samplesNeeded) { + result.add(host) + samplesNeeded-- + } + + remainingHosts-- + } + + return result } override fun toString(): String = "StochasticVictimSelector[$size]" diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index ca947625..02766cb1 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -32,15 +32,15 @@ import org.opendc.compute.simulator.SimWorkloadMapper import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.kernel.SimVirtualMachine -import java.time.Clock import java.time.Duration import java.time.Instant +import java.time.InstantSource /** * A virtual machine instance that is managed by a [SimHost]. */ internal class Guest( - private val clock: Clock, + private val clock: InstantSource, val host: SimHost, private val hypervisor: SimHypervisor, private val mapper: SimWorkloadMapper, diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt index f03bffe9..afc0b0d4 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt @@ -32,7 +32,7 @@ import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.failure.HostFault import org.opendc.compute.simulator.failure.HostFaultInjector import org.opendc.compute.simulator.failure.VictimSelector -import java.time.Clock +import java.time.InstantSource import kotlin.coroutines.CoroutineContext import kotlin.math.roundToLong @@ -40,7 +40,7 @@ import kotlin.math.roundToLong * Internal implementation of the [HostFaultInjector] interface. * * @param context The scope to run the fault injector in. - * @param clock The [Clock] to keep track of simulation time. + * @param clock The [InstantSource] to keep track of simulation time. * @param hosts The set of hosts to inject faults into. * @param iat The inter-arrival time distribution of the failures (in hours). * @param selector The [VictimSelector] to select the host victims. @@ -48,7 +48,7 @@ import kotlin.math.roundToLong */ internal class HostFaultInjectorImpl( private val context: CoroutineContext, - private val clock: Clock, + private val clock: InstantSource, private val hosts: Set<SimHost>, private val iat: RealDistribution, private val selector: VictimSelector, diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index fc581d3e..a496cc99 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -75,7 +75,7 @@ internal class SimHostTest { fun testSingle() = runSimulation { val duration = 5 * 60L - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) @@ -85,7 +85,7 @@ internal class SimHostTest { uid = UUID.randomUUID(), name = "test", meta = emptyMap(), - clock, + timeSource, machine, hypervisor ) @@ -131,7 +131,7 @@ internal class SimHostTest { { assertEquals(639, cpuStats.activeTime, "Active time does not match") }, { assertEquals(2360, cpuStats.idleTime, "Idle time does not match") }, { assertEquals(56, cpuStats.stealTime, "Steal time does not match") }, - { assertEquals(1500001, clock.millis()) } + { assertEquals(1500001, timeSource.millis()) } ) } @@ -142,7 +142,7 @@ internal class SimHostTest { fun testOvercommitted() = runSimulation { val duration = 5 * 60L - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) @@ -152,7 +152,7 @@ internal class SimHostTest { uid = UUID.randomUUID(), name = "test", meta = emptyMap(), - clock, + timeSource, machine, hypervisor ) @@ -218,7 +218,7 @@ internal class SimHostTest { { assertEquals(658, cpuStats.activeTime, "Active time does not match") }, { assertEquals(2341, cpuStats.idleTime, "Idle time does not match") }, { assertEquals(637, cpuStats.stealTime, "Steal time does not match") }, - { assertEquals(1500001, clock.millis()) } + { assertEquals(1500001, timeSource.millis()) } ) } @@ -229,7 +229,7 @@ internal class SimHostTest { fun testFailure() = runSimulation { val duration = 5 * 60L - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) @@ -238,7 +238,7 @@ internal class SimHostTest { uid = UUID.randomUUID(), name = "test", meta = emptyMap(), - clock, + timeSource, machine, hypervisor ) diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt index 90f534e6..29d0b5e7 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt @@ -30,15 +30,15 @@ import org.apache.commons.math3.random.Well19937c import org.junit.jupiter.api.Test import org.opendc.compute.simulator.SimHost import org.opendc.simulator.kotlin.runSimulation -import java.time.Clock import java.time.Duration +import java.time.InstantSource import kotlin.coroutines.CoroutineContext import kotlin.math.ln /** * Test suite for [HostFaultInjector] class. */ -internal class HostFaultInjectorTest { +class HostFaultInjectorTest { /** * Simple test case to test that nothing happens when the injector is not started. */ @@ -46,7 +46,7 @@ internal class HostFaultInjectorTest { fun testInjectorNotStarted() = runSimulation { val host = mockk<SimHost>(relaxUnitFun = true) - val injector = createSimpleInjector(coroutineContext, clock, setOf(host)) + val injector = createSimpleInjector(coroutineContext, timeSource, setOf(host)) coVerify(exactly = 0) { host.fail() } coVerify(exactly = 0) { host.recover() } @@ -61,7 +61,7 @@ internal class HostFaultInjectorTest { fun testInjectorStopsMachine() = runSimulation { val host = mockk<SimHost>(relaxUnitFun = true) - val injector = createSimpleInjector(coroutineContext, clock, setOf(host)) + val injector = createSimpleInjector(coroutineContext, timeSource, setOf(host)) injector.start() @@ -83,7 +83,7 @@ internal class HostFaultInjectorTest { mockk(relaxUnitFun = true) ) - val injector = createSimpleInjector(coroutineContext, clock, hosts.toSet()) + val injector = createSimpleInjector(coroutineContext, timeSource, hosts.toSet()) injector.start() @@ -100,7 +100,7 @@ internal class HostFaultInjectorTest { /** * Create a simple start stop fault injector. */ - private fun createSimpleInjector(context: CoroutineContext, clock: Clock, hosts: Set<SimHost>): HostFaultInjector { + private fun createSimpleInjector(context: CoroutineContext, clock: InstantSource, hosts: Set<SimHost>): HostFaultInjector { val rng = Well19937c(0) val iat = LogNormalDistribution(rng, ln(24 * 7.0), 1.03) val selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25)) |
