summaryrefslogtreecommitdiff
path: root/simulator/opendc-simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-22 21:00:41 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-23 10:45:30 +0100
commitb5ac4b4f0c9a9e0c4b2ee744f8184adbe8e8d76a (patch)
tree53e7d40138e5e805c88e800183b3200f257a53f2 /simulator/opendc-simulator
parent3718c385f84b463ac799080bb5603e0011adcd7d (diff)
simulator: Add support for signaling dynamic capacity changes
This change adds support for dynamically changing the capacity of resources and propagating this change to consumers.
Diffstat (limited to 'simulator/opendc-simulator')
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt19
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt71
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt13
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt7
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt4
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt39
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt17
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt56
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt16
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt26
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt25
13 files changed, 217 insertions, 80 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index 18ac0cd8..e5991264 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -100,10 +100,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
get() = _inputs
private val _inputs = mutableSetOf<SimResourceProvider>()
- private val context = object : SimAbstractResourceContext(clock, _output) {
- override val capacity: Double
- get() = inputContexts.sumByDouble { it.capacity }
-
+ private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) {
override val remainingWork: Double
get() = inputContexts.sumByDouble { it.remainingWork }
@@ -113,13 +110,9 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
interruptAll()
}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- doConsume(work, limit, deadline)
- }
+ override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline)
- override fun onIdle(deadline: Long) {
- doIdle(deadline)
- }
+ override fun onIdle(deadline: Long) = doIdle(deadline)
override fun onFinish(cause: Throwable?) {
doFinish(cause)
@@ -176,6 +169,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
private inner class Consumer : SimResourceConsumer {
override fun onStart(ctx: SimResourceContext) {
onContextStarted(ctx)
+ onCapacityChanged(ctx, false)
// Make sure we initialize the output if we have not done so yet
if (context.state == SimResourceState.Pending) {
@@ -189,6 +183,11 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
return commands[ctx] ?: SimResourceCommand.Idle()
}
+ override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
+ // Adjust capacity of output resource
+ context.capacity = inputContexts.sumByDouble { it.capacity }
+ }
+
override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
onContextFinished(ctx)
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index f65cbaf4..9705bd17 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -23,7 +23,6 @@
package org.opendc.simulator.resources
import java.time.Clock
-import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
@@ -31,10 +30,25 @@ import kotlin.math.min
* Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
*/
public abstract class SimAbstractResourceContext(
+ initialCapacity: Double,
override val clock: Clock,
private val consumer: SimResourceConsumer
) : SimResourceContext {
/**
+ * The capacity of the resource.
+ */
+ public final override var capacity: Double = initialCapacity
+ set(value) {
+ val oldValue = field
+
+ // Only changes will be propagated
+ if (value != oldValue) {
+ field = value
+ onCapacityChange()
+ }
+ }
+
+ /**
* The amount of work still remaining at this instant.
*/
override val remainingWork: Double
@@ -50,6 +64,12 @@ public abstract class SimAbstractResourceContext(
private set
/**
+ * The current processing speed of the resource.
+ */
+ public var speed: Double = 0.0
+ private set
+
+ /**
* This method is invoked when the resource will idle until the specified [deadline].
*/
public abstract fun onIdle(deadline: Long)
@@ -68,20 +88,6 @@ public abstract class SimAbstractResourceContext(
}
/**
- * Compute the duration that a resource consumption will take with the specified [speed].
- */
- protected open fun getDuration(work: Double, speed: Double): Long {
- return ceil(work / speed * 1000).toLong()
- }
-
- /**
- * Compute the speed at which the resource may be consumed.
- */
- protected open fun getSpeed(limit: Double): Double {
- return min(limit, capacity)
- }
-
- /**
* Get the remaining work to process after a resource consumption.
*
* @param work The size of the resource consumption.
@@ -183,8 +189,8 @@ public abstract class SimAbstractResourceContext(
is SimResourceCommand.Consume -> {
// We should only continue processing the next command if:
// 1. The resource consumption was finished.
- // 2. The resource consumer reached its deadline.
- // 3. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ // 2. The resource capacity cannot satisfy the demand.
+ // 4. The resource consumer should be interrupted (e.g., someone called .interrupt())
if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) {
next(now)
} else {
@@ -253,6 +259,8 @@ public abstract class SimAbstractResourceContext(
require(deadline >= now) { "Deadline already passed" }
+ speed = 0.0
+
onIdle(deadline)
}
is SimResourceCommand.Consume -> {
@@ -262,10 +270,15 @@ public abstract class SimAbstractResourceContext(
require(deadline >= now) { "Deadline already passed" }
+ speed = min(capacity, limit)
+
onConsume(work, limit, deadline)
}
is SimResourceCommand.Exit -> {
+ speed = 0.0
+
doStop(null)
+
// No need to set the next active command
return null
}
@@ -286,15 +299,31 @@ public abstract class SimAbstractResourceContext(
val (timestamp, command) = wrapper
val duration = now - timestamp
return when (command) {
- is SimResourceCommand.Consume -> {
- val speed = getSpeed(command.limit)
- getRemainingWork(command.work, speed, duration)
- }
+ is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration)
is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0
}
}
/**
+ * Indicate that the capacity of the resource has changed.
+ */
+ private fun onCapacityChange() {
+ // Do not inform the consumer if it has not been started yet
+ if (state != SimResourceState.Active) {
+ return
+ }
+
+ val isThrottled = speed > capacity
+ consumer.onCapacityChanged(this, isThrottled)
+
+ // Optimization: only flush changes if the new capacity cannot satisfy the active resource command.
+ // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush().
+ if (isThrottled) {
+ flush(isIntermediate = true)
+ }
+ }
+
+ /**
* This class wraps a [command] with the timestamp it was started and possibly the task associated with it.
*/
private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand)
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
index 21f56f9b..f7f3fa4d 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
@@ -23,7 +23,7 @@
package org.opendc.simulator.resources
/**
- * A SimResourceCommand communicates to a [SimResource] how it is consumed by a [SimResourceConsumer].
+ * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer].
*/
public sealed class SimResourceCommand {
/**
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
index 04c7fcaf..672a3e9d 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -46,6 +46,19 @@ public interface SimResourceConsumer {
public fun onNext(ctx: SimResourceContext): SimResourceCommand
/**
+ * This is method is invoked when the capacity of the resource changes.
+ *
+ * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the
+ * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly
+ * causing the active resource command to finish at a later moment than initially planned.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the
+ * capacity change.
+ */
+ public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {}
+
+ /**
* This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit],
* the resource finished itself, or a failure occurred at the resource.
*
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index b0f27b9d..9df333e3 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -335,10 +335,7 @@ public class SimResourceDistributorMaxMin(
private inner class OutputContext(
private val provider: OutputProvider,
consumer: SimResourceConsumer
- ) : SimAbstractResourceContext(clock, consumer), Comparable<OutputContext> {
- override val capacity: Double
- get() = provider.capacity
-
+ ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> {
/**
* The current command that is processed by the vCPU.
*/
@@ -369,7 +366,7 @@ public class SimResourceDistributorMaxMin(
override fun onConsume(work: Double, limit: Double, deadline: Long) {
reportOvercommit()
- allowedSpeed = getSpeed(limit)
+ allowedSpeed = speed
activeCommand = SimResourceCommand.Consume(work, limit, deadline)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
index 227f4d62..1a05accd 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
@@ -118,6 +118,10 @@ public class SimResourceForwarder : SimResourceProvider, SimResourceConsumer {
}
}
+ override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
+ delegate?.onCapacityChanged(ctx, isThrottled)
+ }
+
override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
this.ctx = null
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 3b4e7e7a..9b10edaf 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -26,6 +26,7 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import org.opendc.utils.TimerScheduler
import java.time.Clock
+import kotlin.math.ceil
import kotlin.math.min
/**
@@ -36,7 +37,7 @@ import kotlin.math.min
* @param scheduler The scheduler to schedule the interrupts.
*/
public class SimResourceSource(
- private val initialCapacity: Double,
+ initialCapacity: Double,
private val clock: Clock,
private val scheduler: TimerScheduler<Any>
) : SimResourceProvider {
@@ -48,6 +49,15 @@ public class SimResourceSource(
private val _speed = MutableStateFlow(0.0)
/**
+ * The capacity of the resource.
+ */
+ public var capacity: Double = initialCapacity
+ set(value) {
+ field = value
+ ctx?.capacity = value
+ }
+
+ /**
* The [Context] that is currently running.
*/
private var ctx: Context? = null
@@ -89,20 +99,9 @@ public class SimResourceSource(
/**
* Internal implementation of [SimResourceContext] for this class.
*/
- private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(clock, consumer) {
- override val capacity: Double = initialCapacity
-
- /**
- * The processing speed of the resource.
- */
- private var speed: Double = 0.0
- set(value) {
- field = value
- _speed.value = field
- }
-
+ private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) {
override fun onIdle(deadline: Long) {
- speed = 0.0
+ _speed.value = speed
// Do not resume if deadline is "infinite"
if (deadline != Long.MAX_VALUE) {
@@ -111,14 +110,15 @@ public class SimResourceSource(
}
override fun onConsume(work: Double, limit: Double, deadline: Long) {
- speed = getSpeed(limit)
+ _speed.value = speed
+
val until = min(deadline, clock.millis() + getDuration(work, speed))
scheduler.startSingleTimerTo(this, until, ::flush)
}
override fun onFinish(cause: Throwable?) {
- speed = 0.0
+ _speed.value = speed
scheduler.cancel(this)
cancel()
@@ -127,4 +127,11 @@ public class SimResourceSource(
override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]"
}
+
+ /**
+ * Compute the duration that a resource consumption will take with the specified [speed].
+ */
+ private fun getDuration(work: Double, speed: Double): Long {
+ return ceil(work / speed * 1000).toLong()
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
index 6e431ea1..a10f84b6 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -86,6 +86,8 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
private val forwarder: SimResourceForwarder
) : SimResourceProvider by forwarder {
override fun close() {
+ // We explicitly do not close the forwarder here in order to re-use it across output resources.
+
_outputs -= this
availableResources += forwarder
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
index 8f24a020..faa693c4 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
@@ -39,17 +39,16 @@ public class SimWorkConsumer(
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
- private var limit = 0.0
- private var remainingWork: Double = 0.0
-
- override fun onStart(ctx: SimResourceContext) {
- limit = ctx.capacity * utilization
- remainingWork = work
- }
+ private var isFirst = true
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- val work = this.remainingWork + ctx.remainingWork
- this.remainingWork -= work
+ val limit = ctx.capacity * utilization
+ val work = if (isFirst) {
+ isFirst = false
+ work
+ } else {
+ ctx.remainingWork
+ }
return if (work > 0.0) {
SimResourceCommand.Consume(work, limit)
} else {
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
index 3dffc7bf..de864c1c 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -25,11 +25,9 @@ package org.opendc.simulator.resources
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
-import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.toList
-import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runBlockingTest
-import kotlinx.coroutines.yield
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
@@ -155,4 +153,56 @@ internal class SimResourceAggregatorMaxMinTest {
aggregator.output.close()
}
}
+
+ @Test
+ fun testAdjustCapacity() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(4.0, 1.0)
+ try {
+ coroutineScope {
+ launch { aggregator.output.consume(consumer) }
+ delay(1000)
+ sources[0].capacity = 0.5
+ }
+ yield()
+ assertEquals(2334, currentTime)
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testFailOverCapacity() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(1.0, 0.5)
+ try {
+ coroutineScope {
+ launch { aggregator.output.consume(consumer) }
+ delay(500)
+ sources[0].capacity = 0.5
+ }
+ yield()
+ assertEquals(1000, currentTime)
+ } finally {
+ aggregator.output.close()
+ }
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index c6988ed9..030a0f6b 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -40,9 +40,7 @@ class SimResourceContextTest {
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- val context = object : SimAbstractResourceContext(clock, consumer) {
- override val capacity: Double = 4200.0
-
+ val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
override fun onIdle(deadline: Long) {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
override fun onFinish(cause: Throwable?) {}
@@ -58,9 +56,7 @@ class SimResourceContextTest {
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- val context = spyk(object : SimAbstractResourceContext(clock, consumer) {
- override val capacity: Double = 4200.0
-
+ val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
override fun onIdle(deadline: Long) {}
override fun onFinish(cause: Throwable?) {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
@@ -80,9 +76,7 @@ class SimResourceContextTest {
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- val context = spyk(object : SimAbstractResourceContext(clock, consumer) {
- override val capacity: Double = 4200.0
-
+ val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
override fun onIdle(deadline: Long) {}
override fun onFinish(cause: Throwable?) {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
@@ -107,9 +101,7 @@ class SimResourceContextTest {
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- val context = object : SimAbstractResourceContext(clock, consumer) {
- override val capacity: Double = 4200.0
-
+ val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
override fun onIdle(deadline: Long) {}
override fun onFinish(cause: Throwable?) {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
index f68450ff..143dbca9 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
@@ -24,14 +24,14 @@ package org.opendc.simulator.resources
import io.mockk.every
import io.mockk.mockk
+import io.mockk.spyk
import io.mockk.verify
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.launch
+import kotlinx.coroutines.*
import kotlinx.coroutines.test.runBlockingTest
-import kotlinx.coroutines.yield
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
@@ -163,4 +163,24 @@ internal class SimResourceForwarderTest {
verify(exactly = 1) { consumer.onStart(any()) }
verify(exactly = 1) { consumer.onFinish(any(), null) }
}
+
+ @Test
+ fun testAdjustCapacity() = runBlockingTest {
+ val forwarder = SimResourceForwarder()
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+ source.startConsumer(forwarder)
+
+ coroutineScope {
+ launch { forwarder.consume(consumer) }
+ delay(1000)
+ source.capacity = 0.5
+ }
+
+ assertEquals(3000, currentTime)
+ verify(exactly = 1) { consumer.onCapacityChanged(any(), true) }
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 1279c679..58e19421 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -24,11 +24,14 @@ package org.opendc.simulator.resources
import io.mockk.every
import io.mockk.mockk
+import io.mockk.spyk
+import io.mockk.verify
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
@@ -64,6 +67,28 @@ class SimResourceSourceTest {
}
@Test
+ fun testAdjustCapacity() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+
+ try {
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ delay(1000)
+ provider.capacity = 0.5
+ }
+ assertEquals(3000, currentTime)
+ verify(exactly = 1) { consumer.onCapacityChanged(any(), true) }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
fun testSpeedLimit() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)