diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-22 21:00:41 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-23 10:45:30 +0100 |
| commit | b5ac4b4f0c9a9e0c4b2ee744f8184adbe8e8d76a (patch) | |
| tree | 53e7d40138e5e805c88e800183b3200f257a53f2 /simulator/opendc-simulator | |
| parent | 3718c385f84b463ac799080bb5603e0011adcd7d (diff) | |
simulator: Add support for signaling dynamic capacity changes
This change adds support for dynamically changing the capacity of
resources and propagating this change to consumers.
Diffstat (limited to 'simulator/opendc-simulator')
13 files changed, 217 insertions, 80 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 18ac0cd8..e5991264 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -100,10 +100,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : get() = _inputs private val _inputs = mutableSetOf<SimResourceProvider>() - private val context = object : SimAbstractResourceContext(clock, _output) { - override val capacity: Double - get() = inputContexts.sumByDouble { it.capacity } - + private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) { override val remainingWork: Double get() = inputContexts.sumByDouble { it.remainingWork } @@ -113,13 +110,9 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : interruptAll() } - override fun onConsume(work: Double, limit: Double, deadline: Long) { - doConsume(work, limit, deadline) - } + override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline) - override fun onIdle(deadline: Long) { - doIdle(deadline) - } + override fun onIdle(deadline: Long) = doIdle(deadline) override fun onFinish(cause: Throwable?) { doFinish(cause) @@ -176,6 +169,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : private inner class Consumer : SimResourceConsumer { override fun onStart(ctx: SimResourceContext) { onContextStarted(ctx) + onCapacityChanged(ctx, false) // Make sure we initialize the output if we have not done so yet if (context.state == SimResourceState.Pending) { @@ -189,6 +183,11 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : return commands[ctx] ?: SimResourceCommand.Idle() } + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + // Adjust capacity of output resource + context.capacity = inputContexts.sumByDouble { it.capacity } + } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { onContextFinished(ctx) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index f65cbaf4..9705bd17 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.resources import java.time.Clock -import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -31,10 +30,25 @@ import kotlin.math.min * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. */ public abstract class SimAbstractResourceContext( + initialCapacity: Double, override val clock: Clock, private val consumer: SimResourceConsumer ) : SimResourceContext { /** + * The capacity of the resource. + */ + public final override var capacity: Double = initialCapacity + set(value) { + val oldValue = field + + // Only changes will be propagated + if (value != oldValue) { + field = value + onCapacityChange() + } + } + + /** * The amount of work still remaining at this instant. */ override val remainingWork: Double @@ -50,6 +64,12 @@ public abstract class SimAbstractResourceContext( private set /** + * The current processing speed of the resource. + */ + public var speed: Double = 0.0 + private set + + /** * This method is invoked when the resource will idle until the specified [deadline]. */ public abstract fun onIdle(deadline: Long) @@ -68,20 +88,6 @@ public abstract class SimAbstractResourceContext( } /** - * Compute the duration that a resource consumption will take with the specified [speed]. - */ - protected open fun getDuration(work: Double, speed: Double): Long { - return ceil(work / speed * 1000).toLong() - } - - /** - * Compute the speed at which the resource may be consumed. - */ - protected open fun getSpeed(limit: Double): Double { - return min(limit, capacity) - } - - /** * Get the remaining work to process after a resource consumption. * * @param work The size of the resource consumption. @@ -183,8 +189,8 @@ public abstract class SimAbstractResourceContext( is SimResourceCommand.Consume -> { // We should only continue processing the next command if: // 1. The resource consumption was finished. - // 2. The resource consumer reached its deadline. - // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) + // 2. The resource capacity cannot satisfy the demand. + // 4. The resource consumer should be interrupted (e.g., someone called .interrupt()) if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { next(now) } else { @@ -253,6 +259,8 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } + speed = 0.0 + onIdle(deadline) } is SimResourceCommand.Consume -> { @@ -262,10 +270,15 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } + speed = min(capacity, limit) + onConsume(work, limit, deadline) } is SimResourceCommand.Exit -> { + speed = 0.0 + doStop(null) + // No need to set the next active command return null } @@ -286,15 +299,31 @@ public abstract class SimAbstractResourceContext( val (timestamp, command) = wrapper val duration = now - timestamp return when (command) { - is SimResourceCommand.Consume -> { - val speed = getSpeed(command.limit) - getRemainingWork(command.work, speed, duration) - } + is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration) is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0 } } /** + * Indicate that the capacity of the resource has changed. + */ + private fun onCapacityChange() { + // Do not inform the consumer if it has not been started yet + if (state != SimResourceState.Active) { + return + } + + val isThrottled = speed > capacity + consumer.onCapacityChanged(this, isThrottled) + + // Optimization: only flush changes if the new capacity cannot satisfy the active resource command. + // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush(). + if (isThrottled) { + flush(isIntermediate = true) + } + } + + /** * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. */ private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt index 21f56f9b..f7f3fa4d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.resources /** - * A SimResourceCommand communicates to a [SimResource] how it is consumed by a [SimResourceConsumer]. + * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer]. */ public sealed class SimResourceCommand { /** diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 04c7fcaf..672a3e9d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -46,6 +46,19 @@ public interface SimResourceConsumer { public fun onNext(ctx: SimResourceContext): SimResourceCommand /** + * This is method is invoked when the capacity of the resource changes. + * + * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the + * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly + * causing the active resource command to finish at a later moment than initially planned. + * + * @param ctx The execution context in which the consumer runs. + * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the + * capacity change. + */ + public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {} + + /** * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], * the resource finished itself, or a failure occurred at the resource. * diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index b0f27b9d..9df333e3 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -335,10 +335,7 @@ public class SimResourceDistributorMaxMin( private inner class OutputContext( private val provider: OutputProvider, consumer: SimResourceConsumer - ) : SimAbstractResourceContext(clock, consumer), Comparable<OutputContext> { - override val capacity: Double - get() = provider.capacity - + ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> { /** * The current command that is processed by the vCPU. */ @@ -369,7 +366,7 @@ public class SimResourceDistributorMaxMin( override fun onConsume(work: Double, limit: Double, deadline: Long) { reportOvercommit() - allowedSpeed = getSpeed(limit) + allowedSpeed = speed activeCommand = SimResourceCommand.Consume(work, limit, deadline) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt index 227f4d62..1a05accd 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -118,6 +118,10 @@ public class SimResourceForwarder : SimResourceProvider, SimResourceConsumer { } } + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + delegate?.onCapacityChanged(ctx, isThrottled) + } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { this.ctx = null diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 3b4e7e7a..9b10edaf 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import org.opendc.utils.TimerScheduler import java.time.Clock +import kotlin.math.ceil import kotlin.math.min /** @@ -36,7 +37,7 @@ import kotlin.math.min * @param scheduler The scheduler to schedule the interrupts. */ public class SimResourceSource( - private val initialCapacity: Double, + initialCapacity: Double, private val clock: Clock, private val scheduler: TimerScheduler<Any> ) : SimResourceProvider { @@ -48,6 +49,15 @@ public class SimResourceSource( private val _speed = MutableStateFlow(0.0) /** + * The capacity of the resource. + */ + public var capacity: Double = initialCapacity + set(value) { + field = value + ctx?.capacity = value + } + + /** * The [Context] that is currently running. */ private var ctx: Context? = null @@ -89,20 +99,9 @@ public class SimResourceSource( /** * Internal implementation of [SimResourceContext] for this class. */ - private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = initialCapacity - - /** - * The processing speed of the resource. - */ - private var speed: Double = 0.0 - set(value) { - field = value - _speed.value = field - } - + private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) { override fun onIdle(deadline: Long) { - speed = 0.0 + _speed.value = speed // Do not resume if deadline is "infinite" if (deadline != Long.MAX_VALUE) { @@ -111,14 +110,15 @@ public class SimResourceSource( } override fun onConsume(work: Double, limit: Double, deadline: Long) { - speed = getSpeed(limit) + _speed.value = speed + val until = min(deadline, clock.millis() + getDuration(work, speed)) scheduler.startSingleTimerTo(this, until, ::flush) } override fun onFinish(cause: Throwable?) { - speed = 0.0 + _speed.value = speed scheduler.cancel(this) cancel() @@ -127,4 +127,11 @@ public class SimResourceSource( override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" } + + /** + * Compute the duration that a resource consumption will take with the specified [speed]. + */ + private fun getDuration(work: Double, speed: Double): Long { + return ceil(work / speed * 1000).toLong() + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 6e431ea1..a10f84b6 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -86,6 +86,8 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { private val forwarder: SimResourceForwarder ) : SimResourceProvider by forwarder { override fun close() { + // We explicitly do not close the forwarder here in order to re-use it across output resources. + _outputs -= this availableResources += forwarder } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt index 8f24a020..faa693c4 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -39,17 +39,16 @@ public class SimWorkConsumer( require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - private var limit = 0.0 - private var remainingWork: Double = 0.0 - - override fun onStart(ctx: SimResourceContext) { - limit = ctx.capacity * utilization - remainingWork = work - } + private var isFirst = true override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val work = this.remainingWork + ctx.remainingWork - this.remainingWork -= work + val limit = ctx.capacity * utilization + val work = if (isFirst) { + isFirst = false + work + } else { + ctx.remainingWork + } return if (work > 0.0) { SimResourceCommand.Consume(work, limit) } else { diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index 3dffc7bf..de864c1c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -25,11 +25,9 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.launch import kotlinx.coroutines.test.runBlockingTest -import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll @@ -155,4 +153,56 @@ internal class SimResourceAggregatorMaxMinTest { aggregator.output.close() } } + + @Test + fun testAdjustCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(4.0, 1.0) + try { + coroutineScope { + launch { aggregator.output.consume(consumer) } + delay(1000) + sources[0].capacity = 0.5 + } + yield() + assertEquals(2334, currentTime) + } finally { + aggregator.output.close() + } + } + + @Test + fun testFailOverCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(1.0, 0.5) + try { + coroutineScope { + launch { aggregator.output.consume(consumer) } + delay(500) + sources[0].capacity = 0.5 + } + yield() + assertEquals(1000, currentTime) + } finally { + aggregator.output.close() + } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index c6988ed9..030a0f6b 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -40,9 +40,7 @@ class SimResourceContextTest { val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = 4200.0 - + val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} override fun onFinish(cause: Throwable?) {} @@ -58,9 +56,7 @@ class SimResourceContextTest { val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = 4200.0 - + val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -80,9 +76,7 @@ class SimResourceContextTest { val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = 4200.0 - + val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -107,9 +101,7 @@ class SimResourceContextTest { val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = 4200.0 - + val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt index f68450ff..143dbca9 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt @@ -24,14 +24,14 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk +import io.mockk.spyk import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.launch +import kotlinx.coroutines.* import kotlinx.coroutines.test.runBlockingTest -import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -163,4 +163,24 @@ internal class SimResourceForwarderTest { verify(exactly = 1) { consumer.onStart(any()) } verify(exactly = 1) { consumer.onFinish(any(), null) } } + + @Test + fun testAdjustCapacity() = runBlockingTest { + val forwarder = SimResourceForwarder() + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(1.0, clock, scheduler) + + val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + source.startConsumer(forwarder) + + coroutineScope { + launch { forwarder.consume(consumer) } + delay(1000) + source.capacity = 0.5 + } + + assertEquals(3000, currentTime) + verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 1279c679..58e19421 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -24,11 +24,14 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -64,6 +67,28 @@ class SimResourceSourceTest { } @Test + fun testAdjustCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(1.0, clock, scheduler) + + val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + + try { + coroutineScope { + launch { provider.consume(consumer) } + delay(1000) + provider.capacity = 0.5 + } + assertEquals(3000, currentTime) + verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } + } finally { + scheduler.close() + provider.close() + } + } + + @Test fun testSpeedLimit() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) |
