diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-22 21:00:41 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-23 10:45:30 +0100 |
| commit | b5ac4b4f0c9a9e0c4b2ee744f8184adbe8e8d76a (patch) | |
| tree | 53e7d40138e5e805c88e800183b3200f257a53f2 | |
| parent | 3718c385f84b463ac799080bb5603e0011adcd7d (diff) | |
simulator: Add support for signaling dynamic capacity changes
This change adds support for dynamically changing the capacity of
resources and propagating this change to consumers.
13 files changed, 217 insertions, 80 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 18ac0cd8..e5991264 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -100,10 +100,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : get() = _inputs private val _inputs = mutableSetOf<SimResourceProvider>() - private val context = object : SimAbstractResourceContext(clock, _output) { - override val capacity: Double - get() = inputContexts.sumByDouble { it.capacity } - + private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) { override val remainingWork: Double get() = inputContexts.sumByDouble { it.remainingWork } @@ -113,13 +110,9 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : interruptAll() } - override fun onConsume(work: Double, limit: Double, deadline: Long) { - doConsume(work, limit, deadline) - } + override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline) - override fun onIdle(deadline: Long) { - doIdle(deadline) - } + override fun onIdle(deadline: Long) = doIdle(deadline) override fun onFinish(cause: Throwable?) { doFinish(cause) @@ -176,6 +169,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : private inner class Consumer : SimResourceConsumer { override fun onStart(ctx: SimResourceContext) { onContextStarted(ctx) + onCapacityChanged(ctx, false) // Make sure we initialize the output if we have not done so yet if (context.state == SimResourceState.Pending) { @@ -189,6 +183,11 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : return commands[ctx] ?: SimResourceCommand.Idle() } + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + // Adjust capacity of output resource + context.capacity = inputContexts.sumByDouble { it.capacity } + } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { onContextFinished(ctx) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index f65cbaf4..9705bd17 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.resources import java.time.Clock -import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -31,10 +30,25 @@ import kotlin.math.min * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. */ public abstract class SimAbstractResourceContext( + initialCapacity: Double, override val clock: Clock, private val consumer: SimResourceConsumer ) : SimResourceContext { /** + * The capacity of the resource. + */ + public final override var capacity: Double = initialCapacity + set(value) { + val oldValue = field + + // Only changes will be propagated + if (value != oldValue) { + field = value + onCapacityChange() + } + } + + /** * The amount of work still remaining at this instant. */ override val remainingWork: Double @@ -50,6 +64,12 @@ public abstract class SimAbstractResourceContext( private set /** + * The current processing speed of the resource. + */ + public var speed: Double = 0.0 + private set + + /** * This method is invoked when the resource will idle until the specified [deadline]. */ public abstract fun onIdle(deadline: Long) @@ -68,20 +88,6 @@ public abstract class SimAbstractResourceContext( } /** - * Compute the duration that a resource consumption will take with the specified [speed]. - */ - protected open fun getDuration(work: Double, speed: Double): Long { - return ceil(work / speed * 1000).toLong() - } - - /** - * Compute the speed at which the resource may be consumed. - */ - protected open fun getSpeed(limit: Double): Double { - return min(limit, capacity) - } - - /** * Get the remaining work to process after a resource consumption. * * @param work The size of the resource consumption. @@ -183,8 +189,8 @@ public abstract class SimAbstractResourceContext( is SimResourceCommand.Consume -> { // We should only continue processing the next command if: // 1. The resource consumption was finished. - // 2. The resource consumer reached its deadline. - // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) + // 2. The resource capacity cannot satisfy the demand. + // 4. The resource consumer should be interrupted (e.g., someone called .interrupt()) if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { next(now) } else { @@ -253,6 +259,8 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } + speed = 0.0 + onIdle(deadline) } is SimResourceCommand.Consume -> { @@ -262,10 +270,15 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } + speed = min(capacity, limit) + onConsume(work, limit, deadline) } is SimResourceCommand.Exit -> { + speed = 0.0 + doStop(null) + // No need to set the next active command return null } @@ -286,15 +299,31 @@ public abstract class SimAbstractResourceContext( val (timestamp, command) = wrapper val duration = now - timestamp return when (command) { - is SimResourceCommand.Consume -> { - val speed = getSpeed(command.limit) - getRemainingWork(command.work, speed, duration) - } + is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration) is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0 } } /** + * Indicate that the capacity of the resource has changed. + */ + private fun onCapacityChange() { + // Do not inform the consumer if it has not been started yet + if (state != SimResourceState.Active) { + return + } + + val isThrottled = speed > capacity + consumer.onCapacityChanged(this, isThrottled) + + // Optimization: only flush changes if the new capacity cannot satisfy the active resource command. + // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush(). + if (isThrottled) { + flush(isIntermediate = true) + } + } + + /** * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. */ private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt index 21f56f9b..f7f3fa4d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.resources /** - * A SimResourceCommand communicates to a [SimResource] how it is consumed by a [SimResourceConsumer]. + * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer]. */ public sealed class SimResourceCommand { /** diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 04c7fcaf..672a3e9d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -46,6 +46,19 @@ public interface SimResourceConsumer { public fun onNext(ctx: SimResourceContext): SimResourceCommand /** + * This is method is invoked when the capacity of the resource changes. + * + * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the + * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly + * causing the active resource command to finish at a later moment than initially planned. + * + * @param ctx The execution context in which the consumer runs. + * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the + * capacity change. + */ + public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {} + + /** * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], * the resource finished itself, or a failure occurred at the resource. * diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index b0f27b9d..9df333e3 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -335,10 +335,7 @@ public class SimResourceDistributorMaxMin( private inner class OutputContext( private val provider: OutputProvider, consumer: SimResourceConsumer - ) : SimAbstractResourceContext(clock, consumer), Comparable<OutputContext> { - override val capacity: Double - get() = provider.capacity - + ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> { /** * The current command that is processed by the vCPU. */ @@ -369,7 +366,7 @@ public class SimResourceDistributorMaxMin( override fun onConsume(work: Double, limit: Double, deadline: Long) { reportOvercommit() - allowedSpeed = getSpeed(limit) + allowedSpeed = speed activeCommand = SimResourceCommand.Consume(work, limit, deadline) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt index 227f4d62..1a05accd 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -118,6 +118,10 @@ public class SimResourceForwarder : SimResourceProvider, SimResourceConsumer { } } + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + delegate?.onCapacityChanged(ctx, isThrottled) + } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { this.ctx = null diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 3b4e7e7a..9b10edaf 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import org.opendc.utils.TimerScheduler import java.time.Clock +import kotlin.math.ceil import kotlin.math.min /** @@ -36,7 +37,7 @@ import kotlin.math.min * @param scheduler The scheduler to schedule the interrupts. */ public class SimResourceSource( - private val initialCapacity: Double, + initialCapacity: Double, private val clock: Clock, private val scheduler: TimerScheduler<Any> ) : SimResourceProvider { @@ -48,6 +49,15 @@ public class SimResourceSource( private val _speed = MutableStateFlow(0.0) /** + * The capacity of the resource. + */ + public var capacity: Double = initialCapacity + set(value) { + field = value + ctx?.capacity = value + } + + /** * The [Context] that is currently running. */ private var ctx: Context? = null @@ -89,20 +99,9 @@ public class SimResourceSource( /** * Internal implementation of [SimResourceContext] for this class. */ - private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = initialCapacity - - /** - * The processing speed of the resource. - */ - private var speed: Double = 0.0 - set(value) { - field = value - _speed.value = field - } - + private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) { override fun onIdle(deadline: Long) { - speed = 0.0 + _speed.value = speed // Do not resume if deadline is "infinite" if (deadline != Long.MAX_VALUE) { @@ -111,14 +110,15 @@ public class SimResourceSource( } override fun onConsume(work: Double, limit: Double, deadline: Long) { - speed = getSpeed(limit) + _speed.value = speed + val until = min(deadline, clock.millis() + getDuration(work, speed)) scheduler.startSingleTimerTo(this, until, ::flush) } override fun onFinish(cause: Throwable?) { - speed = 0.0 + _speed.value = speed scheduler.cancel(this) cancel() @@ -127,4 +127,11 @@ public class SimResourceSource( override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" } + + /** + * Compute the duration that a resource consumption will take with the specified [speed]. + */ + private fun getDuration(work: Double, speed: Double): Long { + return ceil(work / speed * 1000).toLong() + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 6e431ea1..a10f84b6 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -86,6 +86,8 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { private val forwarder: SimResourceForwarder ) : SimResourceProvider by forwarder { override fun close() { + // We explicitly do not close the forwarder here in order to re-use it across output resources. + _outputs -= this availableResources += forwarder } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt index 8f24a020..faa693c4 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -39,17 +39,16 @@ public class SimWorkConsumer( require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - private var limit = 0.0 - private var remainingWork: Double = 0.0 - - override fun onStart(ctx: SimResourceContext) { - limit = ctx.capacity * utilization - remainingWork = work - } + private var isFirst = true override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val work = this.remainingWork + ctx.remainingWork - this.remainingWork -= work + val limit = ctx.capacity * utilization + val work = if (isFirst) { + isFirst = false + work + } else { + ctx.remainingWork + } return if (work > 0.0) { SimResourceCommand.Consume(work, limit) } else { diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index 3dffc7bf..de864c1c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -25,11 +25,9 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.launch import kotlinx.coroutines.test.runBlockingTest -import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll @@ -155,4 +153,56 @@ internal class SimResourceAggregatorMaxMinTest { aggregator.output.close() } } + + @Test + fun testAdjustCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(4.0, 1.0) + try { + coroutineScope { + launch { aggregator.output.consume(consumer) } + delay(1000) + sources[0].capacity = 0.5 + } + yield() + assertEquals(2334, currentTime) + } finally { + aggregator.output.close() + } + } + + @Test + fun testFailOverCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(1.0, 0.5) + try { + coroutineScope { + launch { aggregator.output.consume(consumer) } + delay(500) + sources[0].capacity = 0.5 + } + yield() + assertEquals(1000, currentTime) + } finally { + aggregator.output.close() + } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index c6988ed9..030a0f6b 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -40,9 +40,7 @@ class SimResourceContextTest { val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = 4200.0 - + val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} override fun onFinish(cause: Throwable?) {} @@ -58,9 +56,7 @@ class SimResourceContextTest { val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = 4200.0 - + val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -80,9 +76,7 @@ class SimResourceContextTest { val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = 4200.0 - + val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -107,9 +101,7 @@ class SimResourceContextTest { val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = 4200.0 - + val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt index f68450ff..143dbca9 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt @@ -24,14 +24,14 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk +import io.mockk.spyk import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.launch +import kotlinx.coroutines.* import kotlinx.coroutines.test.runBlockingTest -import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -163,4 +163,24 @@ internal class SimResourceForwarderTest { verify(exactly = 1) { consumer.onStart(any()) } verify(exactly = 1) { consumer.onFinish(any(), null) } } + + @Test + fun testAdjustCapacity() = runBlockingTest { + val forwarder = SimResourceForwarder() + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(1.0, clock, scheduler) + + val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + source.startConsumer(forwarder) + + coroutineScope { + launch { forwarder.consume(consumer) } + delay(1000) + source.capacity = 0.5 + } + + assertEquals(3000, currentTime) + verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 1279c679..58e19421 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -24,11 +24,14 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -64,6 +67,28 @@ class SimResourceSourceTest { } @Test + fun testAdjustCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(1.0, clock, scheduler) + + val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + + try { + coroutineScope { + launch { provider.consume(consumer) } + delay(1000) + provider.capacity = 0.5 + } + assertEquals(3000, currentTime) + verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } + } finally { + scheduler.close() + provider.close() + } + } + + @Test fun testSpeedLimit() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) |
