diff options
19 files changed, 171 insertions, 124 deletions
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 8e1aab44..89d01c57 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -104,7 +104,7 @@ public class SimHost( _cpuWorkInterference.record(interferedWork.toDouble()) _cpuUsage.record(cpuUsage) _cpuDemand.record(cpuDemand) - _cpuPower.record(machine.powerDraw.value) + _cpuPower.record(machine.powerDraw) } } ) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 02cfdc06..0441cfed 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -119,7 +119,7 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, { assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(434262255818, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) @@ -157,7 +157,7 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( { assertEquals(702636229989, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(172807361391, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(172636987071, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, { assertEquals(528959213229, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 2127b066..1f26c9c9 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -25,14 +25,13 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.SimResourceProvider import org.opendc.simulator.resources.SimResourceSource import org.opendc.simulator.resources.consume +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -47,9 +46,9 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine /** * The speed of the CPU cores. */ - public val speed: List<Double> + public val speed: DoubleArray get() = _speed - private var _speed = mutableListOf<Double>() + private var _speed = doubleArrayOf() /** * A flag to indicate that the machine is terminated. @@ -94,29 +93,32 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine val ctx = Context(resources, meta) val totalCapacity = model.cpus.sumByDouble { it.frequency } - _speed = MutableList(model.cpus.size) { 0.0 } + _speed = DoubleArray(model.cpus.size) { 0.0 } + var totalSpeed = 0.0 workload.onStart(ctx) for ((cpu, source) in resources) { val consumer = workload.getConsumer(ctx, cpu) - val job = source.speed - .onEach { - _speed[cpu.id] = it - _usage.value = _speed.sum() / totalCapacity - } - .launchIn(this) - - launch { - try { - source.consume(consumer) - } finally { - job.cancel() - } + val adapter = SimSpeedConsumerAdapter(consumer) { newSpeed -> + val oldSpeed = _speed[cpu.id] + _speed[cpu.id] = newSpeed + totalSpeed = totalSpeed - oldSpeed + newSpeed + + updateUsage(totalSpeed / totalCapacity) } + + launch { source.consume(adapter) } } } + /** + * This method is invoked when the usage of the machine is updated. + */ + protected open fun updateUsage(usage: Double) { + _usage.value = usage + } + override fun close() { if (!isTerminated) { isTerminated = true diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 51b807d2..d5577279 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -84,12 +84,15 @@ public class SimBareMetalMachine( /** * The power draw of the machine. */ - public val powerDraw: StateFlow<Double> = usage - .map { - this.scalingGovernors.forEach { it.onLimit() } - this.scalingDriver.computePower() - } - .stateIn(scope, SharingStarted.Eagerly, 0.0) + public var powerDraw: Double = 0.0 + private set + + override fun updateUsage(usage: Double) { + super.updateUsage(usage) + + scalingGovernors.forEach { it.onLimit() } + powerDraw = scalingDriver.computePower() + } override fun close() { super.close() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt index b4bbf9fb..4d62c383 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt @@ -28,7 +28,7 @@ package org.opendc.simulator.compute.cpufreq public class DemandScalingGovernor : ScalingGovernor { override fun createLogic(ctx: ScalingContext): ScalingGovernor.Logic = object : ScalingGovernor.Logic { override fun onLimit() { - ctx.setTarget(ctx.resource.speed.value) + ctx.setTarget(ctx.resource.speed) } override fun toString(): String = "DemandScalingGovernor.Logic" diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt index d109e4d8..1c82253c 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt @@ -59,7 +59,7 @@ public class PStateScalingDriver(states: Map<Double, PowerModel>) : ScalingDrive for (ctx in contexts) { targetFreq = max(ctx.target, targetFreq) - totalSpeed += ctx.resource.speed.value + totalSpeed += ctx.resource.speed } val maxFreq = states.lastKey() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt index 19c06126..c02b6285 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt @@ -35,7 +35,7 @@ internal class DemandScalingGovernorTest { fun testSetDemandLimit() { val ctx = mockk<ScalingContext>(relaxUnitFun = true) - every { ctx.resource.speed.value } returns 2100.0 + every { ctx.resource.speed } returns 2100.0 val logic = DemandScalingGovernor().createLogic(ctx) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt index 5c30bc1f..c6f233a6 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt @@ -59,7 +59,7 @@ internal class PStateScalingDriverTest { val resource = mockk<SimResourceSource>() every { cpu.frequency } returns 4100.0 - every { resource.speed.value } returns 1200.0 + every { resource.speed } returns 1200.0 val driver = PStateScalingDriver( sortedMapOf( @@ -84,7 +84,7 @@ internal class PStateScalingDriverTest { val resource = mockk<SimResourceSource>() every { cpu.frequency } returns 4100.0 - every { resource.speed.value } returns 1200.0 + every { resource.speed } returns 1200.0 val driver = PStateScalingDriver( sortedMapOf( @@ -125,11 +125,11 @@ internal class PStateScalingDriverTest { val scalingContext = logic.createContext(cpu, resource) - every { resource.speed.value } returns 1400.0 + every { resource.speed } returns 1400.0 scalingContext.setTarget(1400.0) assertEquals(150.0, logic.computePower()) - every { resource.speed.value } returns 1400.0 + every { resource.speed } returns 1400.0 scalingContext.setTarget(4000.0) assertEquals(235.0, logic.computePower()) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index e5991264..c7fa6a17 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -102,7 +102,18 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) { override val remainingWork: Double - get() = inputContexts.sumByDouble { it.remainingWork } + get() { + val now = clock.millis() + + return if (_remainingWorkFlush < now) { + _remainingWorkFlush = now + _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it } + } else { + _remainingWork + } + } + private var _remainingWork: Double = 0.0 + private var _remainingWorkFlush: Long = Long.MIN_VALUE override fun interrupt() { super.interrupt() diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index 5c5ee038..05ed0714 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -272,6 +272,7 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } speed = 0.0 + consumer.onConfirm(this, 0.0) onIdle(deadline) } @@ -283,6 +284,7 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } speed = min(capacity, limit) + consumer.onConfirm(this, speed) onConsume(work, limit, deadline) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 672a3e9d..38672b13 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.resources /** - * A [SimResourceConsumer] characterizes how a [SimResource] is consumed. + * A [SimResourceConsumer] characterizes how a resource is consumed. * * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently) * for multiple resource providers, unless explicitly said otherwise. @@ -46,6 +46,14 @@ public interface SimResourceConsumer { public fun onNext(ctx: SimResourceContext): SimResourceCommand /** + * This method is invoked when the resource provider confirms that the consumer is running at the given speed. + * + * @param ctx The execution context in which the consumer runs. + * @param speed The speed at which the consumer runs. + */ + public fun onConfirm(ctx: SimResourceContext, speed: Double) {} + + /** * This is method is invoked when the capacity of the resource changes. * * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index 9df333e3..dfdd2c2e 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -106,7 +106,7 @@ public class SimResourceDistributorMaxMin( val output = iterator.next() // Remove the output from the outputs to prevent ConcurrentModificationException when removing it - // during the call tou output.close() + // during the call to output.close() iterator.remove() output.close() @@ -251,8 +251,8 @@ public class SimResourceDistributorMaxMin( totalAllocatedWork - totalRemainingWork, totalOvercommittedWork.toLong(), totalInterferedWork.toLong(), - totalRequestedSpeed, totalAllocatedSpeed, + totalRequestedSpeed ) totalInterferedWork = 0.0 diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 9b10edaf..025b0406 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow import org.opendc.utils.TimerScheduler import java.time.Clock import kotlin.math.ceil @@ -42,11 +40,10 @@ public class SimResourceSource( private val scheduler: TimerScheduler<Any> ) : SimResourceProvider { /** - * The resource processing speed over time. + * The current processing speed of the resource. */ - public val speed: StateFlow<Double> - get() = _speed - private val _speed = MutableStateFlow(0.0) + public val speed: Double + get() = ctx?.speed ?: 0.0 /** * The capacity of the resource. @@ -101,8 +98,6 @@ public class SimResourceSource( */ private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) { override fun onIdle(deadline: Long) { - _speed.value = speed - // Do not resume if deadline is "infinite" if (deadline != Long.MAX_VALUE) { scheduler.startSingleTimerTo(this, deadline) { flush() } @@ -110,15 +105,12 @@ public class SimResourceSource( } override fun onConsume(work: Double, limit: Double, deadline: Long) { - _speed.value = speed - val until = min(deadline, clock.millis() + getDuration(work, speed)) scheduler.startSingleTimerTo(this, until, ::flush) } override fun onFinish(cause: Throwable?) { - _speed.value = speed scheduler.cancel(this) cancel() diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index 73f18c7c..de455021 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -124,6 +124,10 @@ public class SimResourceTransformer( } } + override fun onConfirm(ctx: SimResourceContext, speed: Double) { + delegate?.onConfirm(ctx, speed) + } + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { delegate?.onCapacityChanged(ctx, isThrottled) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt index fd4a9ed5..114c7312 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources.consumer -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -32,37 +30,52 @@ import kotlin.math.min /** * Helper class to expose an observable [speed] field describing the speed of the consumer. */ -public class SimSpeedConsumerAdapter(private val delegate: SimResourceConsumer) : SimResourceConsumer by delegate { +public class SimSpeedConsumerAdapter( + private val delegate: SimResourceConsumer, + private val callback: (Double) -> Unit = {} +) : SimResourceConsumer by delegate { /** - * The resource processing speed over time. + * The resource processing speed at this instant. */ - public val speed: StateFlow<Double> - get() = _speed - private val _speed = MutableStateFlow(0.0) + public var speed: Double = 0.0 + private set(value) { + if (field != value) { + callback(value) + field = value + } + } + + init { + callback(0.0) + } override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val command = delegate.onNext(ctx) + return delegate.onNext(ctx) + } - when (command) { - is SimResourceCommand.Idle -> _speed.value = 0.0 - is SimResourceCommand.Consume -> _speed.value = min(ctx.capacity, command.limit) - is SimResourceCommand.Exit -> _speed.value = 0.0 - } + override fun onConfirm(ctx: SimResourceContext, speed: Double) { + delegate.onConfirm(ctx, speed) - return command + this.speed = speed } override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { - val oldSpeed = _speed.value + val oldSpeed = speed delegate.onCapacityChanged(ctx, isThrottled) // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might // need to update the current speed. - if (oldSpeed == _speed.value) { - _speed.value = min(ctx.capacity, _speed.value) + if (oldSpeed == speed) { + speed = min(ctx.capacity, speed) } } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + super.onFinish(ctx, cause) + + speed = 0.0 + } + override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]" } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index de864c1c..bf8c6d1f 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -26,12 +26,12 @@ import io.mockk.every import io.mockk.mockk import io.mockk.verify import kotlinx.coroutines.* -import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -47,15 +47,18 @@ internal class SimResourceAggregatorMaxMinTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(clock) + val forwarder = SimResourceForwarder() val sources = listOf( - SimResourceSource(1.0, clock, scheduler), + forwarder, SimResourceSource(1.0, clock, scheduler) ) sources.forEach(aggregator::addInput) val consumer = SimWorkConsumer(1.0, 0.5) val usage = mutableListOf<Double>() - val job = launch { sources[0].speed.toList(usage) } + val source = SimResourceSource(1.0, clock, scheduler) + val adapter = SimSpeedConsumerAdapter(forwarder, usage::add) + source.startConsumer(adapter) try { aggregator.output.consume(consumer) @@ -67,7 +70,6 @@ internal class SimResourceAggregatorMaxMinTest { ) } finally { aggregator.output.close() - job.cancel() } } @@ -85,18 +87,17 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = SimWorkConsumer(2.0, 1.0) val usage = mutableListOf<Double>() - val job = launch { sources[0].speed.toList(usage) } + val adapter = SimSpeedConsumerAdapter(consumer, usage::add) try { - aggregator.output.consume(consumer) + aggregator.output.consume(adapter) yield() assertAll( { assertEquals(1000, currentTime) }, - { assertEquals(listOf(0.0, 1.0, 0.0), usage) } + { assertEquals(listOf(0.0, 2.0, 0.0), usage) } ) } finally { aggregator.output.close() - job.cancel() } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 58e19421..dbba6160 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -27,10 +27,10 @@ import io.mockk.mockk import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.* -import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -54,11 +54,10 @@ class SimResourceSourceTest { try { val res = mutableListOf<Double>() - val job = launch { provider.speed.toList(res) } + val adapter = SimSpeedConsumerAdapter(consumer, res::add) - provider.consume(consumer) + provider.consume(adapter) - job.cancel() assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { scheduler.close() @@ -102,11 +101,10 @@ class SimResourceSourceTest { try { val res = mutableListOf<Double>() - val job = launch { provider.speed.toList(res) } + val adapter = SimSpeedConsumerAdapter(consumer, res::add) - provider.consume(consumer) + provider.consume(adapter) - job.cancel() assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { scheduler.close() diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index edd60502..9a40edc4 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -25,14 +25,13 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.launch import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimTraceConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -65,17 +64,17 @@ internal class SimResourceSwitchExclusiveTest { val switch = SimResourceSwitchExclusive() val source = SimResourceSource(3200.0, clock, scheduler) - - switch.addInput(source) + val forwarder = SimResourceForwarder() + val adapter = SimSpeedConsumerAdapter(forwarder, speed::add) + source.startConsumer(adapter) + switch.addInput(forwarder) val provider = switch.addOutput(3200.0) - val job = launch { source.speed.toList(speed) } try { provider.consume(workload) yield() } finally { - job.cancel() provider.close() } diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt index d4bc7b5c..aa2f3367 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt @@ -34,8 +34,8 @@ import kotlin.math.max /** * A TimerScheduler facilitates scheduled execution of future tasks. * - * @property context The [CoroutineContext] to run the tasks with. - * @property clock The clock to keep track of the time. + * @param context The [CoroutineContext] to run the tasks with. + * @param clock The clock to keep track of the time. */ @OptIn(ExperimentalCoroutinesApi::class) public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clock) : AutoCloseable { @@ -60,45 +60,51 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo private val channel = Channel<Long?>(Channel.CONFLATED) /** - * The scheduling job. + * A flag to indicate that the scheduler is active. */ - private val job = scope.launch { - val timers = timers - val queue = queue - val clock = clock - var next: Long? = channel.receive() + private var isActive = true - while (true) { - next = select { - channel.onReceive { it } + init { + scope.launch { + val timers = timers + val queue = queue + val clock = clock + val job = requireNotNull(coroutineContext[Job]) + val exceptionHandler = coroutineContext[CoroutineExceptionHandler] + var next: Long? = channel.receive() - val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select + while (true) { + next = select { + channel.onReceive { it } - onTimeout(delay) { - while (queue.isNotEmpty() && isActive) { - val timer = queue.peek() - val timestamp = clock.millis() + val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select - assert(timer.timestamp >= timestamp) { "Found task in the past" } + onTimeout(delay) { + while (queue.isNotEmpty() && job.isActive) { + val timer = queue.peek() + val timestamp = clock.millis() - if (timer.timestamp > timestamp && !timer.isCancelled) { - // Schedule a task for the next event to occur. - return@onTimeout timer.timestamp - } + assert(timer.timestamp >= timestamp) { "Found task in the past" } + + if (timer.timestamp > timestamp && !timer.isCancelled) { + // Schedule a task for the next event to occur. + return@onTimeout timer.timestamp + } - queue.poll() + queue.poll() - if (!timer.isCancelled) { - timers.remove(timer.key) - try { - timer() - } catch (e: Throwable) { - coroutineContext[CoroutineExceptionHandler]?.handleException(coroutineContext, e) + if (!timer.isCancelled) { + timers.remove(timer.key) + try { + timer() + } catch (e: Throwable) { + exceptionHandler?.handleException(coroutineContext, e) + } } } - } - null + null + } } } } @@ -108,6 +114,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo * Stop the scheduler. */ override fun close() { + isActive = false cancelAll() scope.cancel() } @@ -121,7 +128,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo * @param key The key of the timer to cancel. */ public fun cancel(key: T) { - if (!job.isActive) { + if (!isActive) { return } @@ -131,11 +138,14 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo timer?.isCancelled = true // Optimization: check whether we are the head of the queue - if (queue.peek() == timer) { + val queue = queue + val channel = channel + val peek = queue.peek() + if (peek == timer) { queue.poll() if (queue.isNotEmpty()) { - channel.sendBlocking(queue.peek().timestamp) + channel.sendBlocking(peek.timestamp) } else { channel.sendBlocking(null) } @@ -182,12 +192,14 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo */ public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) { val now = clock.millis() + val queue = queue + val channel = channel require(timestamp >= now) { "Timestamp must be in the future" } - check(job.isActive) { "Timer is stopped" } + check(isActive) { "Timer is stopped" } timers.compute(key) { _, old -> - if (old?.timestamp == timestamp) { + if (old != null && old.timestamp == timestamp) { // Fast-path: timer for the same timestamp already exists old } else { @@ -198,8 +210,9 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo queue.add(timer) // Check if we need to push the interruption forward - if (queue.peek() == timer) { - channel.sendBlocking(timer.timestamp) + // Note that we check by timer reference + if (queue.peek() === timer) { + channel.offer(timer.timestamp) } timer @@ -214,6 +227,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo /** * A flag to indicate that the task has been cancelled. */ + @JvmField var isCancelled: Boolean = false /** |
