summaryrefslogtreecommitdiff
path: root/simulator/opendc-simulator/opendc-simulator-resources/src
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-resources/src')
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt13
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt10
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt4
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt14
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt4
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt47
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt17
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt12
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt11
10 files changed, 81 insertions, 53 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index e5991264..c7fa6a17 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -102,7 +102,18 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) {
override val remainingWork: Double
- get() = inputContexts.sumByDouble { it.remainingWork }
+ get() {
+ val now = clock.millis()
+
+ return if (_remainingWorkFlush < now) {
+ _remainingWorkFlush = now
+ _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it }
+ } else {
+ _remainingWork
+ }
+ }
+ private var _remainingWork: Double = 0.0
+ private var _remainingWorkFlush: Long = Long.MIN_VALUE
override fun interrupt() {
super.interrupt()
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index 5c5ee038..05ed0714 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -272,6 +272,7 @@ public abstract class SimAbstractResourceContext(
require(deadline >= now) { "Deadline already passed" }
speed = 0.0
+ consumer.onConfirm(this, 0.0)
onIdle(deadline)
}
@@ -283,6 +284,7 @@ public abstract class SimAbstractResourceContext(
require(deadline >= now) { "Deadline already passed" }
speed = min(capacity, limit)
+ consumer.onConfirm(this, speed)
onConsume(work, limit, deadline)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
index 672a3e9d..38672b13 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -23,7 +23,7 @@
package org.opendc.simulator.resources
/**
- * A [SimResourceConsumer] characterizes how a [SimResource] is consumed.
+ * A [SimResourceConsumer] characterizes how a resource is consumed.
*
* Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently)
* for multiple resource providers, unless explicitly said otherwise.
@@ -46,6 +46,14 @@ public interface SimResourceConsumer {
public fun onNext(ctx: SimResourceContext): SimResourceCommand
/**
+ * This method is invoked when the resource provider confirms that the consumer is running at the given speed.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @param speed The speed at which the consumer runs.
+ */
+ public fun onConfirm(ctx: SimResourceContext, speed: Double) {}
+
+ /**
* This is method is invoked when the capacity of the resource changes.
*
* After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index 9df333e3..dfdd2c2e 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -106,7 +106,7 @@ public class SimResourceDistributorMaxMin(
val output = iterator.next()
// Remove the output from the outputs to prevent ConcurrentModificationException when removing it
- // during the call tou output.close()
+ // during the call to output.close()
iterator.remove()
output.close()
@@ -251,8 +251,8 @@ public class SimResourceDistributorMaxMin(
totalAllocatedWork - totalRemainingWork,
totalOvercommittedWork.toLong(),
totalInterferedWork.toLong(),
- totalRequestedSpeed,
totalAllocatedSpeed,
+ totalRequestedSpeed
)
totalInterferedWork = 0.0
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 9b10edaf..025b0406 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
import org.opendc.utils.TimerScheduler
import java.time.Clock
import kotlin.math.ceil
@@ -42,11 +40,10 @@ public class SimResourceSource(
private val scheduler: TimerScheduler<Any>
) : SimResourceProvider {
/**
- * The resource processing speed over time.
+ * The current processing speed of the resource.
*/
- public val speed: StateFlow<Double>
- get() = _speed
- private val _speed = MutableStateFlow(0.0)
+ public val speed: Double
+ get() = ctx?.speed ?: 0.0
/**
* The capacity of the resource.
@@ -101,8 +98,6 @@ public class SimResourceSource(
*/
private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) {
override fun onIdle(deadline: Long) {
- _speed.value = speed
-
// Do not resume if deadline is "infinite"
if (deadline != Long.MAX_VALUE) {
scheduler.startSingleTimerTo(this, deadline) { flush() }
@@ -110,15 +105,12 @@ public class SimResourceSource(
}
override fun onConsume(work: Double, limit: Double, deadline: Long) {
- _speed.value = speed
-
val until = min(deadline, clock.millis() + getDuration(work, speed))
scheduler.startSingleTimerTo(this, until, ::flush)
}
override fun onFinish(cause: Throwable?) {
- _speed.value = speed
scheduler.cancel(this)
cancel()
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index 73f18c7c..de455021 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -124,6 +124,10 @@ public class SimResourceTransformer(
}
}
+ override fun onConfirm(ctx: SimResourceContext, speed: Double) {
+ delegate?.onConfirm(ctx, speed)
+ }
+
override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
delegate?.onCapacityChanged(ctx, isThrottled)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
index fd4a9ed5..114c7312 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.resources.consumer
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
@@ -32,37 +30,52 @@ import kotlin.math.min
/**
* Helper class to expose an observable [speed] field describing the speed of the consumer.
*/
-public class SimSpeedConsumerAdapter(private val delegate: SimResourceConsumer) : SimResourceConsumer by delegate {
+public class SimSpeedConsumerAdapter(
+ private val delegate: SimResourceConsumer,
+ private val callback: (Double) -> Unit = {}
+) : SimResourceConsumer by delegate {
/**
- * The resource processing speed over time.
+ * The resource processing speed at this instant.
*/
- public val speed: StateFlow<Double>
- get() = _speed
- private val _speed = MutableStateFlow(0.0)
+ public var speed: Double = 0.0
+ private set(value) {
+ if (field != value) {
+ callback(value)
+ field = value
+ }
+ }
+
+ init {
+ callback(0.0)
+ }
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- val command = delegate.onNext(ctx)
+ return delegate.onNext(ctx)
+ }
- when (command) {
- is SimResourceCommand.Idle -> _speed.value = 0.0
- is SimResourceCommand.Consume -> _speed.value = min(ctx.capacity, command.limit)
- is SimResourceCommand.Exit -> _speed.value = 0.0
- }
+ override fun onConfirm(ctx: SimResourceContext, speed: Double) {
+ delegate.onConfirm(ctx, speed)
- return command
+ this.speed = speed
}
override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
- val oldSpeed = _speed.value
+ val oldSpeed = speed
delegate.onCapacityChanged(ctx, isThrottled)
// Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
// need to update the current speed.
- if (oldSpeed == _speed.value) {
- _speed.value = min(ctx.capacity, _speed.value)
+ if (oldSpeed == speed) {
+ speed = min(ctx.capacity, speed)
}
}
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ super.onFinish(ctx, cause)
+
+ speed = 0.0
+ }
+
override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]"
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
index de864c1c..bf8c6d1f 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -26,12 +26,12 @@ import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
@@ -47,15 +47,18 @@ internal class SimResourceAggregatorMaxMinTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val aggregator = SimResourceAggregatorMaxMin(clock)
+ val forwarder = SimResourceForwarder()
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
+ forwarder,
SimResourceSource(1.0, clock, scheduler)
)
sources.forEach(aggregator::addInput)
val consumer = SimWorkConsumer(1.0, 0.5)
val usage = mutableListOf<Double>()
- val job = launch { sources[0].speed.toList(usage) }
+ val source = SimResourceSource(1.0, clock, scheduler)
+ val adapter = SimSpeedConsumerAdapter(forwarder, usage::add)
+ source.startConsumer(adapter)
try {
aggregator.output.consume(consumer)
@@ -67,7 +70,6 @@ internal class SimResourceAggregatorMaxMinTest {
)
} finally {
aggregator.output.close()
- job.cancel()
}
}
@@ -85,18 +87,17 @@ internal class SimResourceAggregatorMaxMinTest {
val consumer = SimWorkConsumer(2.0, 1.0)
val usage = mutableListOf<Double>()
- val job = launch { sources[0].speed.toList(usage) }
+ val adapter = SimSpeedConsumerAdapter(consumer, usage::add)
try {
- aggregator.output.consume(consumer)
+ aggregator.output.consume(adapter)
yield()
assertAll(
{ assertEquals(1000, currentTime) },
- { assertEquals(listOf(0.0, 1.0, 0.0), usage) }
+ { assertEquals(listOf(0.0, 2.0, 0.0), usage) }
)
} finally {
aggregator.output.close()
- job.cancel()
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 58e19421..dbba6160 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -27,10 +27,10 @@ import io.mockk.mockk
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
@@ -54,11 +54,10 @@ class SimResourceSourceTest {
try {
val res = mutableListOf<Double>()
- val job = launch { provider.speed.toList(res) }
+ val adapter = SimSpeedConsumerAdapter(consumer, res::add)
- provider.consume(consumer)
+ provider.consume(adapter)
- job.cancel()
assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
} finally {
scheduler.close()
@@ -102,11 +101,10 @@ class SimResourceSourceTest {
try {
val res = mutableListOf<Double>()
- val job = launch { provider.speed.toList(res) }
+ val adapter = SimSpeedConsumerAdapter(consumer, res::add)
- provider.consume(consumer)
+ provider.consume(adapter)
- job.cancel()
assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
} finally {
scheduler.close()
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index edd60502..9a40edc4 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -25,14 +25,13 @@ package org.opendc.simulator.resources
import io.mockk.every
import io.mockk.mockk
import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.flow.toList
-import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runBlockingTest
import kotlinx.coroutines.yield
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimTraceConsumer
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
@@ -65,17 +64,17 @@ internal class SimResourceSwitchExclusiveTest {
val switch = SimResourceSwitchExclusive()
val source = SimResourceSource(3200.0, clock, scheduler)
-
- switch.addInput(source)
+ val forwarder = SimResourceForwarder()
+ val adapter = SimSpeedConsumerAdapter(forwarder, speed::add)
+ source.startConsumer(adapter)
+ switch.addInput(forwarder)
val provider = switch.addOutput(3200.0)
- val job = launch { source.speed.toList(speed) }
try {
provider.consume(workload)
yield()
} finally {
- job.cancel()
provider.close()
}