summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt6
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt15
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt12
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt2
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt14
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt14
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt40
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt15
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt16
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt5
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt129
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt48
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt31
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt28
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt13
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt83
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt19
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt31
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt50
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt54
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt11
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt19
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt2
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt2
31 files changed, 243 insertions, 480 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 30cc1466..ffc50ad8 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -118,7 +118,7 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
{ assertEquals(223331032, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
{ assertEquals(67006568, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
- { assertEquals(3159379, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
+ { assertEquals(3088047, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
{ assertEquals(5.841120890240688E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
)
@@ -211,8 +211,8 @@ class CapelinIntegrationTest {
assertAll(
{ assertEquals(6013899, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
{ assertEquals(14724501, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(473394, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
+ { assertEquals(12027839, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(477664, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index 0873aac9..bfc5fc6f 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -41,6 +41,7 @@ import java.util.*
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
+import kotlin.math.roundToLong
/**
* A [TFDevice] implementation using simulated components.
@@ -127,13 +128,16 @@ public class SimTFDevice(
}
}
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ val consumedWork = ctx.speed * delta / 1000.0
+
val activeWork = activeWork
if (activeWork != null) {
- if (activeWork.consume(activeWork.flops - ctx.remainingWork)) {
+ if (activeWork.consume(consumedWork)) {
this.activeWork = null
} else {
- return SimResourceCommand.Consume(activeWork.flops, ctx.capacity)
+ val duration = (activeWork.flops / ctx.capacity * 1000).roundToLong()
+ return SimResourceCommand.Consume(ctx.capacity, duration)
}
}
@@ -141,9 +145,10 @@ public class SimTFDevice(
val head = queue.poll()
return if (head != null) {
this.activeWork = head
- SimResourceCommand.Consume(head.flops, ctx.capacity)
+ val duration = (head.flops / ctx.capacity * 1000).roundToLong()
+ SimResourceCommand.Consume(ctx.capacity, duration)
} else {
- SimResourceCommand.Idle()
+ SimResourceCommand.Consume(0.0)
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
index 0a7dc40f..34ac4418 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
@@ -83,13 +83,13 @@ public class SimPsu(
}
override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0)
return if (powerDraw > 0.0)
- SimResourceCommand.Consume(Double.POSITIVE_INFINITY, powerDraw, Long.MAX_VALUE)
+ SimResourceCommand.Consume(powerDraw, Long.MAX_VALUE)
else
- SimResourceCommand.Idle()
+ SimResourceCommand.Consume(0.0)
}
override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 5a4c4f44..527619bd 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -80,14 +80,13 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val
}
private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- val now = ctx.clock.millis()
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
val fragment = pullFragment(now) ?: return SimResourceCommand.Exit
val timestamp = fragment.timestamp + offset
// Fragment is in the future
if (timestamp > now) {
- return SimResourceCommand.Idle(timestamp)
+ return SimResourceCommand.Consume(0.0, timestamp - now)
}
val cores = min(cpu.node.coreCount, fragment.cores)
@@ -97,12 +96,11 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val
0.0
val deadline = timestamp + fragment.duration
val duration = deadline - now
- val work = duration * usage / 1000
- return if (cpu.id < cores && work > 0.0)
- SimResourceCommand.Consume(work, usage, deadline)
+ return if (cpu.id < cores && usage > 0.0)
+ SimResourceCommand.Consume(usage, duration)
else
- SimResourceCommand.Idle(deadline)
+ SimResourceCommand.Consume(0.0, duration)
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index 3d3feb2a..55d6d7c4 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -155,6 +155,8 @@ internal class SimSpaceSharedHypervisorTest {
vm.run(SimRuntimeWorkload(duration))
vm.close()
+ yield()
+
val vm2 = hypervisor.createMachine(machineModel)
vm2.run(SimRuntimeWorkload(duration))
vm2.close()
diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
index 5efdbed9..f2dd8455 100644
--- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
+++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
@@ -32,7 +32,7 @@ public class SimNetworkSink(
public val capacity: Double
) : SimNetworkPort() {
override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Idle()
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand = SimResourceCommand.Consume(0.0)
override fun toString(): String = "SimNetworkSink.Consumer"
}
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
index 3ce85d02..e8839496 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
@@ -47,21 +47,13 @@ public class SimPdu(
public fun newOutlet(): Outlet = Outlet(distributor.newOutput())
override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer by distributor {
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- return when (val cmd = distributor.onNext(ctx)) {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ return when (val cmd = distributor.onNext(ctx, now, delta)) {
is SimResourceCommand.Consume -> {
- val duration = cmd.work / cmd.limit
val loss = computePowerLoss(cmd.limit)
val newLimit = cmd.limit + loss
- SimResourceCommand.Consume(duration * newLimit, newLimit, cmd.deadline)
- }
- is SimResourceCommand.Idle -> {
- val loss = computePowerLoss(0.0)
- if (loss > 0.0)
- SimResourceCommand.Consume(Double.POSITIVE_INFINITY, loss, cmd.deadline)
- else
- cmd
+ SimResourceCommand.Consume(newLimit, cmd.duration)
}
else -> cmd
}
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
index f9431d21..4c2beb68 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
@@ -55,21 +55,13 @@ public class SimUps(
override fun onConnect(inlet: SimPowerInlet) {
val consumer = inlet.createConsumer()
aggregator.startConsumer(object : SimResourceConsumer by consumer {
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- return when (val cmd = consumer.onNext(ctx)) {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ return when (val cmd = consumer.onNext(ctx, now, delta)) {
is SimResourceCommand.Consume -> {
- val duration = cmd.work / cmd.limit
val loss = computePowerLoss(cmd.limit)
val newLimit = cmd.limit + loss
- SimResourceCommand.Consume(duration * newLimit, newLimit, cmd.deadline)
- }
- is SimResourceCommand.Idle -> {
- val loss = computePowerLoss(0.0)
- if (loss > 0.0)
- SimResourceCommand.Consume(Double.POSITIVE_INFINITY, loss, cmd.deadline)
- else
- cmd
+ SimResourceCommand.Consume(newLimit, cmd.duration)
}
else -> cmd
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index 00648876..da5c3257 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -32,12 +32,7 @@ public abstract class SimAbstractResourceAggregator(
/**
* This method is invoked when the resource consumer consumes resources.
*/
- protected abstract fun doConsume(work: Double, limit: Double, deadline: Long)
-
- /**
- * This method is invoked when the resource consumer enters an idle state.
- */
- protected abstract fun doIdle(deadline: Long)
+ protected abstract fun doConsume(limit: Double, duration: Long)
/**
* This method is invoked when the resource consumer finishes processing.
@@ -98,26 +93,23 @@ public abstract class SimAbstractResourceAggregator(
private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) {
override fun createLogic(): SimResourceProviderLogic {
return object : SimResourceProviderLogic {
- override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
- doIdle(deadline)
- return Long.MAX_VALUE
- }
- override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
- doConsume(work, limit, deadline)
- return Long.MAX_VALUE
+ override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long {
+ doConsume(limit, duration)
+ return super.onConsume(ctx, now, limit, duration)
}
override fun onFinish(ctx: SimResourceControllableContext) {
doFinish()
}
- override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {
- updateCounters(ctx, work, willOvercommit)
- }
-
- override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
- return work - _inputConsumers.sumOf { it.remainingWork }
+ override fun onUpdate(
+ ctx: SimResourceControllableContext,
+ delta: Long,
+ limit: Double,
+ willOvercommit: Boolean
+ ) {
+ updateCounters(ctx, delta, limit, willOvercommit)
}
}
}
@@ -157,12 +149,6 @@ public abstract class SimAbstractResourceAggregator(
private var _ctx: SimResourceContext? = null
/**
- * The remaining work of the consumer.
- */
- val remainingWork: Double
- get() = _ctx?.remainingWork ?: 0.0
-
- /**
* The resource command to run next.
*/
private var command: SimResourceCommand? = null
@@ -179,7 +165,7 @@ public abstract class SimAbstractResourceAggregator(
}
/* SimResourceConsumer */
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
var next = command
return if (next != null) {
@@ -190,7 +176,7 @@ public abstract class SimAbstractResourceAggregator(
next = command
this.command = null
- next ?: SimResourceCommand.Idle()
+ next ?: SimResourceCommand.Consume(0.0)
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
index 4e8e803a..548bc228 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
@@ -87,21 +87,35 @@ public abstract class SimAbstractResourceProvider(
/**
* Update the counters of the resource provider.
*/
- protected fun updateCounters(ctx: SimResourceContext, work: Double, willOvercommit: Boolean) {
- if (work <= 0.0) {
+ protected fun updateCounters(ctx: SimResourceContext, delta: Long, limit: Double, willOvercommit: Boolean) {
+ if (delta <= 0.0) {
return
}
val counters = _counters
- val remainingWork = ctx.remainingWork
+ val deltaS = delta / 1000.0
+ val work = limit * deltaS
+ val actualWork = ctx.speed * deltaS
+ val remainingWork = work - actualWork
+
counters.demand += work
- counters.actual += work - remainingWork
+ counters.actual += actualWork
if (willOvercommit && remainingWork > 0.0) {
counters.overcommit += remainingWork
}
}
+ /**
+ * Update the counters of the resource provider.
+ */
+ protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) {
+ val counters = _counters
+ counters.demand += demand
+ counters.actual += actual
+ counters.overcommit += overcommit
+ }
+
final override fun startConsumer(consumer: SimResourceConsumer) {
check(ctx == null) { "Resource is in invalid state" }
val ctx = interpreter.newContext(consumer, createLogic(), parent)
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
index 991cda7a..537be1b5 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -31,7 +31,7 @@ public class SimResourceAggregatorMaxMin(
) : SimAbstractResourceAggregator(interpreter, parent) {
private val consumers = mutableListOf<Input>()
- override fun doConsume(work: Double, limit: Double, deadline: Long) {
+ override fun doConsume(limit: Double, duration: Long) {
// Sort all consumers by their capacity
consumers.sortWith(compareBy { it.ctx.capacity })
@@ -40,22 +40,15 @@ public class SimResourceAggregatorMaxMin(
val inputCapacity = input.ctx.capacity
val fraction = inputCapacity / capacity
val grantedSpeed = limit * fraction
- val grantedWork = fraction * work
- val command = if (grantedWork > 0.0 && grantedSpeed > 0.0)
- SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
+ val command = if (grantedSpeed > 0.0)
+ SimResourceCommand.Consume(grantedSpeed, duration)
else
- SimResourceCommand.Idle()
+ SimResourceCommand.Consume(0.0, duration)
input.push(command)
}
}
- override fun doIdle(deadline: Long) {
- for (input in consumers) {
- input.push(SimResourceCommand.Idle(deadline))
- }
- }
-
override fun doFinish() {
val iterator = consumers.iterator()
for (input in iterator) {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
index f7f3fa4d..4a980071 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
@@ -27,25 +27,19 @@ package org.opendc.simulator.resources
*/
public sealed class SimResourceCommand {
/**
- * A request to the resource to perform the specified amount of work before the given [deadline].
+ * A request to the resource to perform work for the specified [duration].
*
- * @param work The amount of work to process.
* @param limit The maximum amount of work to be processed per second.
- * @param deadline The instant at which the work needs to be fulfilled.
+ * @param duration The duration of the resource consumption in milliseconds.
*/
- public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() {
+ public data class Consume(val limit: Double, val duration: Long = Long.MAX_VALUE) : SimResourceCommand() {
init {
- require(work > 0) { "Amount of work must be positive" }
- require(limit > 0) { "Limit must be positive" }
+ require(limit >= 0.0) { "Negative limit is not allowed" }
+ require(duration >= 0) { "Duration must be positive" }
}
}
/**
- * An indication to the resource that the consumer will idle until the specified [deadline] or if it is interrupted.
- */
- public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand()
-
- /**
* An indication to the resource that the consumer has finished.
*/
public object Exit : SimResourceCommand()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
index 4d937514..4d1d2c32 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -34,9 +34,11 @@ public interface SimResourceConsumer {
* the resource finished processing, reached its deadline or was interrupted.
*
* @param ctx The execution context in which the consumer runs.
+ * @param now The virtual timestamp in milliseconds at which the update is occurring.
+ * @param delta The virtual duration between this call and the last call in milliseconds.
* @return The next command that the resource should execute.
*/
- public fun onNext(ctx: SimResourceContext): SimResourceCommand
+ public fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand
/**
* This method is invoked when an event has occurred.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
index 0d9a6106..f28b43d0 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -50,11 +50,6 @@ public interface SimResourceContext {
public val demand: Double
/**
- * The amount of work still remaining at this instant.
- */
- public val remainingWork: Double
-
- /**
* Ask the resource provider to interrupt its resource.
*/
public fun interrupt()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index 63cfbdac..d23c7dbb 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -97,8 +97,8 @@ public class SimResourceDistributorMaxMin(
}
/* SimResourceConsumer */
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- return doNext(ctx)
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ return doNext(ctx, now)
}
override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
@@ -135,38 +135,16 @@ public class SimResourceDistributorMaxMin(
}
/**
- * Update the counters of the distributor.
- */
- private fun updateCounters(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {
- if (work <= 0.0) {
- return
- }
-
- val counters = _counters
- val remainingWork = ctx.remainingWork
-
- counters.demand += work
- counters.actual += work - remainingWork
-
- if (willOvercommit && remainingWork > 0.0) {
- counters.overcommit += remainingWork
- }
- }
-
- /**
* Schedule the work of the outputs.
*/
- private fun doNext(ctx: SimResourceContext): SimResourceCommand {
+ private fun doNext(ctx: SimResourceContext, now: Long): SimResourceCommand {
// If there is no work yet, mark the input as idle.
if (activeOutputs.isEmpty()) {
- return SimResourceCommand.Idle()
+ return SimResourceCommand.Consume(0.0)
}
- val now = interpreter.clock.millis()
-
val capacity = ctx.capacity
- var duration: Double = Double.MAX_VALUE
- var deadline: Long = Long.MAX_VALUE
+ var duration: Long = Long.MAX_VALUE
var availableSpeed = capacity
var totalRequestedSpeed = 0.0
@@ -191,10 +169,10 @@ public class SimResourceDistributorMaxMin(
val availableShare = availableSpeed / remaining--
val grantedSpeed = min(output.allowedSpeed, availableShare)
- deadline = min(deadline, output.deadline)
+ duration = min(duration, output.duration)
// Ignore idle computation
- if (grantedSpeed <= 0.0 || output.work <= 0.0) {
+ if (grantedSpeed <= 0.0) {
output.actualSpeed = 0.0
continue
}
@@ -203,35 +181,29 @@ public class SimResourceDistributorMaxMin(
output.actualSpeed = grantedSpeed
availableSpeed -= grantedSpeed
-
- // The duration that we want to run is that of the shortest request of an output
- duration = min(duration, output.work / grantedSpeed)
}
- val targetDuration = min(duration, (deadline - now) / 1000.0)
+ val durationS = duration / 1000.0
var totalRequestedWork = 0.0
var totalAllocatedWork = 0.0
for (output in activeOutputs) {
- val work = output.work
+ val limit = output.limit
val speed = output.actualSpeed
if (speed > 0.0) {
- val outputDuration = work / speed
- totalRequestedWork += work * (duration / outputDuration)
- totalAllocatedWork += work * (targetDuration / outputDuration)
+ totalRequestedWork += limit * durationS
+ totalAllocatedWork += speed * durationS
}
}
- assert(deadline >= now) { "Deadline already passed" }
-
this.totalRequestedSpeed = totalRequestedSpeed
this.totalAllocatedWork = totalAllocatedWork
val totalAllocatedSpeed = capacity - availableSpeed
this.totalAllocatedSpeed = totalAllocatedSpeed
return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0)
- SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline)
+ SimResourceCommand.Consume(totalAllocatedSpeed, duration)
else
- SimResourceCommand.Idle(deadline)
+ SimResourceCommand.Consume(0.0, duration)
}
private fun updateCapacity(ctx: SimResourceContext) {
@@ -243,7 +215,7 @@ public class SimResourceDistributorMaxMin(
/**
* An internal [SimResourceProvider] implementation for switch outputs.
*/
- private inner class Output(capacity: Double, private val key: InterferenceKey?) :
+ private inner class Output(capacity: Double, val key: InterferenceKey?) :
SimAbstractResourceProvider(interpreter, parent, capacity),
SimResourceCloseableProvider,
SimResourceProviderLogic,
@@ -254,11 +226,6 @@ public class SimResourceDistributorMaxMin(
private var isClosed: Boolean = false
/**
- * The current requested work.
- */
- @JvmField var work: Double = 0.0
-
- /**
* The requested limit.
*/
@JvmField var limit: Double = 0.0
@@ -266,7 +233,7 @@ public class SimResourceDistributorMaxMin(
/**
* The current deadline.
*/
- @JvmField var deadline: Long = Long.MAX_VALUE
+ @JvmField var duration: Long = Long.MAX_VALUE
/**
* The processing speed that is allowed by the model constraints.
@@ -304,44 +271,19 @@ public class SimResourceDistributorMaxMin(
}
/* SimResourceProviderLogic */
- override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
- allowedSpeed = 0.0
- this.deadline = deadline
- work = 0.0
- limit = 0.0
- lastCommandTimestamp = ctx.clock.millis()
-
- return Long.MAX_VALUE
- }
-
- override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
+ override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long {
allowedSpeed = min(ctx.capacity, limit)
- this.work = work
this.limit = limit
- this.deadline = deadline
- lastCommandTimestamp = ctx.clock.millis()
-
- return Long.MAX_VALUE
- }
-
- override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {
- updateCounters(ctx, work, willOvercommit)
+ this.duration = duration
+ lastCommandTimestamp = now
- this@SimResourceDistributorMaxMin.updateCounters(ctx, work, willOvercommit)
- }
-
- override fun onFinish(ctx: SimResourceControllableContext) {
- work = 0.0
- limit = 0.0
- deadline = Long.MAX_VALUE
- lastCommandTimestamp = ctx.clock.millis()
+ return super.onConsume(ctx, now, limit, duration)
}
- override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
- val totalRemainingWork = this@SimResourceDistributorMaxMin.ctx?.remainingWork ?: 0.0
-
- // Compute the fraction of compute time allocated to the output
- val fraction = actualSpeed / totalAllocatedSpeed
+ override fun onUpdate(ctx: SimResourceControllableContext, delta: Long, limit: Double, willOvercommit: Boolean) {
+ if (delta <= 0.0) {
+ return
+ }
// Compute the performance penalty due to resource interference
val perfScore = if (interferenceDomain != null) {
@@ -351,12 +293,29 @@ public class SimResourceDistributorMaxMin(
1.0
}
- // Compute the work that was actually granted to the output.
- val potentialConsumedWork = (totalAllocatedWork - totalRemainingWork) * fraction
+ val deltaS = delta / 1000.0
+ val work = limit * deltaS
+ val actualWork = actualSpeed * deltaS
+ val remainingWork = work - actualWork
+ val overcommit = if (willOvercommit && remainingWork > 0.0) {
+ remainingWork
+ } else {
+ 0.0
+ }
- _counters.interference += potentialConsumedWork * max(0.0, 1 - perfScore)
+ updateCounters(work, actualWork, overcommit)
- return potentialConsumedWork
+ val distCounters = _counters
+ distCounters.demand += work
+ distCounters.actual += actualWork
+ distCounters.overcommit += overcommit
+ distCounters.interference += actualWork * max(0.0, 1 - perfScore)
+ }
+
+ override fun onFinish(ctx: SimResourceControllableContext) {
+ limit = 0.0
+ duration = Long.MAX_VALUE
+ lastCommandTimestamp = ctx.clock.millis()
}
/* Comparable */
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
index 2fe1b00f..d8ff87f9 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
@@ -27,53 +27,33 @@ package org.opendc.simulator.resources
*/
public interface SimResourceProviderLogic {
/**
- * This method is invoked when the resource is reported to idle until the specified [deadline].
+ * This method is invoked when the consumer ask to consume the resource for the specified [duration].
*
* @param ctx The context in which the provider runs.
- * @param deadline The deadline that was requested by the resource consumer.
- * @return The instant at which to resume the consumer.
- */
- public fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long
-
- /**
- * This method is invoked when the resource will be consumed until the specified amount of [work] was processed
- * or [deadline] is reached.
- *
- * @param ctx The context in which the provider runs.
- * @param work The amount of work that was requested by the resource consumer.
+ * @param now The virtual timestamp in milliseconds at which the update is occurring.
* @param limit The limit on the work rate of the resource consumer.
- * @param deadline The deadline that was requested by the resource consumer.
- * @return The instant at which to resume the consumer.
+ * @param duration The duration of the consumption in milliseconds.
+ * @return The deadline of the resource consumption.
*/
- public fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long
+ public fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long {
+ return if (duration == Long.MAX_VALUE) {
+ return Long.MAX_VALUE
+ } else {
+ now + duration
+ }
+ }
/**
* This method is invoked when the progress of the resource consumer is materialized.
*
* @param ctx The context in which the provider runs.
- * @param work The amount of work that was requested by the resource consumer.
+ * @param limit The limit on the work rate of the resource consumer.
* @param willOvercommit A flag to indicate that the remaining work is overcommitted.
*/
- public fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {}
+ public fun onUpdate(ctx: SimResourceControllableContext, delta: Long, limit: Double, willOvercommit: Boolean) {}
/**
* This method is invoked when the resource consumer has finished.
*/
- public fun onFinish(ctx: SimResourceControllableContext)
-
- /**
- * Compute the amount of work that was consumed over the specified [duration].
- *
- * @param work The total size of the resource consumption.
- * @param speed The speed of the resource provider.
- * @param duration The duration from the start of the consumption until now.
- * @return The amount of work that was consumed by the resource provider.
- */
- public fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
- return if (duration > 0L) {
- return (duration / 1000.0) * speed
- } else {
- work
- }
- }
+ public fun onFinish(ctx: SimResourceControllableContext) {}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 2d53198a..10213f26 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -22,9 +22,6 @@
package org.opendc.simulator.resources
-import kotlin.math.ceil
-import kotlin.math.min
-
/**
* A [SimResourceSource] represents a source for some resource that provides bounded processing capacity.
*
@@ -39,20 +36,13 @@ public class SimResourceSource(
) : SimAbstractResourceProvider(interpreter, parent, initialCapacity) {
override fun createLogic(): SimResourceProviderLogic {
return object : SimResourceProviderLogic {
- override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
- return deadline
- }
-
- override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
- return if (work.isInfinite()) {
- Long.MAX_VALUE
- } else {
- min(deadline, ctx.clock.millis() + getDuration(work, speed))
- }
- }
-
- override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {
- updateCounters(ctx, work, willOvercommit)
+ override fun onUpdate(
+ ctx: SimResourceControllableContext,
+ delta: Long,
+ limit: Double,
+ willOvercommit: Boolean
+ ) {
+ updateCounters(ctx, delta, limit, willOvercommit)
}
override fun onFinish(ctx: SimResourceControllableContext) {
@@ -62,11 +52,4 @@ public class SimResourceSource(
}
override fun toString(): String = "SimResourceSource[capacity=$capacity]"
-
- /**
- * Compute the duration that a resource consumption will take with the specified [speed].
- */
- private fun getDuration(work: Double, speed: Double): Long {
- return ceil(work / speed * 1000).toLong()
- }
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index cec27e1c..68bedbd9 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -100,19 +100,19 @@ public class SimResourceTransformer(
}
}
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
val delegate = delegate
if (!hasDelegateStarted) {
start()
}
- updateCounters(ctx)
+ updateCounters(ctx, delta)
return if (delegate != null) {
- val command = transform(ctx, delegate.onNext(ctx))
+ val command = transform(ctx, delegate.onNext(ctx, now, delta))
- _work = if (command is SimResourceCommand.Consume) command.work else 0.0
+ _limit = if (command is SimResourceCommand.Consume) command.limit else 0.0
if (command == SimResourceCommand.Exit) {
// Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
@@ -124,12 +124,12 @@ public class SimResourceTransformer(
if (isCoupled)
SimResourceCommand.Exit
else
- onNext(ctx)
+ onNext(ctx, now, delta)
} else {
command
}
} else {
- SimResourceCommand.Idle()
+ SimResourceCommand.Consume(0.0)
}
}
@@ -180,19 +180,21 @@ public class SimResourceTransformer(
}
/**
- * Counter to track the current submitted work.
+ * The requested speed.
*/
- private var _work = 0.0
+ private var _limit: Double = 0.0
/**
* Update the resource counters for the transformer.
*/
- private fun updateCounters(ctx: SimResourceContext) {
+ private fun updateCounters(ctx: SimResourceContext, delta: Long) {
val counters = _counters
- val remainingWork = ctx.remainingWork
- counters.demand += _work
- counters.actual += _work - remainingWork
- counters.overcommit += remainingWork
+ val deltaS = delta / 1000.0
+ val work = _limit * deltaS
+ val actualWork = ctx.speed * deltaS
+ counters.demand += work
+ counters.actual += actualWork
+ counters.overcommit += (work - actualWork)
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
index 4f4ebb14..1f8434b7 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
@@ -50,8 +50,8 @@ public class SimSpeedConsumerAdapter(
callback(0.0)
}
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- return delegate.onNext(ctx)
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ return delegate.onNext(ctx, now, delta)
}
override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
index 2e94e1c1..e5173e5f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -34,20 +34,11 @@ import org.opendc.simulator.resources.SimResourceEvent
public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer {
private var iterator: Iterator<Fragment>? = null
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
val iterator = checkNotNull(iterator)
return if (iterator.hasNext()) {
- val now = ctx.clock.millis()
val fragment = iterator.next()
- val work = (fragment.duration / 1000) * fragment.usage
- val deadline = now + fragment.duration
-
- assert(deadline >= now) { "Deadline already passed" }
-
- if (work > 0.0)
- SimResourceCommand.Consume(work, fragment.usage, deadline)
- else
- SimResourceCommand.Idle(deadline)
+ SimResourceCommand.Consume(fragment.usage, fragment.duration)
} else {
SimResourceCommand.Exit
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
index faa693c4..ae837043 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
@@ -25,6 +25,7 @@ package org.opendc.simulator.resources.consumer
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
+import kotlin.math.roundToLong
/**
* A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization.
@@ -39,18 +40,19 @@ public class SimWorkConsumer(
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
- private var isFirst = true
+ private var remainingWork = work
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ val actualWork = ctx.speed * delta / 1000.0
val limit = ctx.capacity * utilization
- val work = if (isFirst) {
- isFirst = false
- work
- } else {
- ctx.remainingWork
- }
- return if (work > 0.0) {
- SimResourceCommand.Consume(work, limit)
+
+ remainingWork -= actualWork
+
+ val remainingWork = remainingWork
+ val duration = (remainingWork / limit * 1000).roundToLong()
+
+ return if (duration > 0) {
+ SimResourceCommand.Consume(limit, duration)
} else {
SimResourceCommand.Exit
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
index b79998a3..a9507e52 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -58,12 +58,6 @@ internal class SimResourceContextImpl(
}
/**
- * The amount of work still remaining at this instant.
- */
- override val remainingWork: Double
- get() = getRemainingWork(_clock.millis())
-
- /**
* A flag to indicate the state of the context.
*/
override val state: SimResourceState
@@ -87,8 +81,8 @@ internal class SimResourceContextImpl(
* The current state of the resource context.
*/
private var _timestamp: Long = Long.MIN_VALUE
- private var _work: Double = 0.0
private var _limit: Double = 0.0
+ private var _duration: Long = Long.MAX_VALUE
private var _deadline: Long = Long.MAX_VALUE
/**
@@ -178,7 +172,6 @@ internal class SimResourceContextImpl(
} catch (cause: Throwable) {
doFail(cause)
} finally {
- _remainingWorkFlush = Long.MIN_VALUE
_timestamp = timestamp
}
}
@@ -200,29 +193,25 @@ internal class SimResourceContextImpl(
SimResourceState.Pending, SimResourceState.Stopped -> state
SimResourceState.Active -> {
val isInterrupted = _flag and FLAG_INTERRUPT != 0
- val remainingWork = getRemainingWork(timestamp)
- val isConsume = _limit > 0.0
val reachedDeadline = _deadline <= timestamp
+ val delta = max(0, timestamp - _timestamp)
// Update the resource counters only if there is some progress
if (timestamp > _timestamp) {
- logic.onUpdate(this, _work, reachedDeadline)
+ logic.onUpdate(this, delta, _limit, reachedDeadline)
}
// We should only continue processing the next command if:
// 1. The resource consumption was finished.
// 2. The resource capacity cannot satisfy the demand.
// 3. The resource consumer should be interrupted (e.g., someone called .interrupt())
- if ((isConsume && remainingWork == 0.0) || reachedDeadline || isInterrupted) {
- when (val command = consumer.onNext(this)) {
- is SimResourceCommand.Idle -> interpretIdle(timestamp, command.deadline)
- is SimResourceCommand.Consume -> interpretConsume(timestamp, command.work, command.limit, command.deadline)
+ if (reachedDeadline || isInterrupted) {
+ when (val command = consumer.onNext(this, timestamp, delta)) {
+ is SimResourceCommand.Consume -> interpretConsume(timestamp, command.limit, command.duration)
is SimResourceCommand.Exit -> interpretExit()
}
- } else if (isConsume) {
- interpretConsume(timestamp, remainingWork, _limit, _deadline)
} else {
- interpretIdle(timestamp, _deadline)
+ interpretConsume(timestamp, _limit, _duration - delta)
}
}
}
@@ -257,32 +246,15 @@ internal class SimResourceContextImpl(
/**
* Interpret the [SimResourceCommand.Consume] command.
*/
- private fun interpretConsume(now: Long, work: Double, limit: Double, deadline: Long): SimResourceState {
- require(deadline >= now) { "Deadline already passed" }
-
+ private fun interpretConsume(now: Long, limit: Double, duration: Long): SimResourceState {
_speed = min(capacity, limit)
- _work = work
_limit = limit
- _deadline = deadline
+ _duration = duration
- val timestamp = logic.onConsume(this, work, limit, deadline)
- scheduleUpdate(timestamp)
+ val timestamp = logic.onConsume(this, now, limit, duration)
- return SimResourceState.Active
- }
-
- /**
- * Interpret the [SimResourceCommand.Idle] command.
- */
- private fun interpretIdle(now: Long, deadline: Long): SimResourceState {
- require(deadline >= now) { "Deadline already passed" }
-
- _speed = 0.0
- _work = 0.0
- _limit = 0.0
- _deadline = deadline
+ _deadline = timestamp
- val timestamp = logic.onIdle(this, deadline)
scheduleUpdate(timestamp)
return SimResourceState.Active
@@ -293,37 +265,13 @@ internal class SimResourceContextImpl(
*/
private fun interpretExit(): SimResourceState {
_speed = 0.0
- _work = 0.0
_limit = 0.0
+ _duration = Long.MAX_VALUE
_deadline = Long.MAX_VALUE
return SimResourceState.Stopped
}
- private var _remainingWork: Double = 0.0
- private var _remainingWorkFlush: Long = Long.MIN_VALUE
-
- /**
- * Obtain the remaining work at the given timestamp.
- */
- private fun getRemainingWork(now: Long): Double {
- return if (_remainingWorkFlush < now) {
- _remainingWorkFlush = now
- computeRemainingWork(now).also { _remainingWork = it }
- } else {
- _remainingWork
- }
- }
-
- /**
- * Compute the remaining work based on the current state.
- */
- private fun computeRemainingWork(now: Long): Double {
- return if (_work > 0.0)
- max(0.0, _work - logic.getConsumedWork(this, _work, speed, now - _timestamp))
- else 0.0
- }
-
/**
* Indicate that the capacity of the resource has changed.
*/
@@ -333,16 +281,11 @@ internal class SimResourceContextImpl(
return
}
- val isThrottled = speed > capacity
-
interpreter.batch {
// Inform the consumer of the capacity change. This might already trigger an interrupt.
consumer.onEvent(this, SimResourceEvent.Capacity)
- // Optimization: only invalidate context if the new capacity cannot satisfy the active resource command.
- if (isThrottled) {
- invalidate()
- }
+ interrupt()
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
index 2f01a8c4..a9390553 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -38,7 +38,6 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* Test suite for the [SimResourceAggregatorMaxMin] class.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class SimResourceAggregatorMaxMinTest {
@Test
fun testSingleCapacity() = runBlockingSimulation {
@@ -102,15 +101,15 @@ internal class SimResourceAggregatorMaxMinTest {
sources.forEach(aggregator::addInput)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) }
- .returns(SimResourceCommand.Consume(4.0, 4.0, 1000))
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(4.0, 1000))
.andThen(SimResourceCommand.Exit)
aggregator.consume(consumer)
yield()
assertEquals(1000, clock.millis())
- verify(exactly = 2) { consumer.onNext(any()) }
+ verify(exactly = 2) { consumer.onNext(any(), any(), any()) }
}
@Test
@@ -125,8 +124,8 @@ internal class SimResourceAggregatorMaxMinTest {
sources.forEach(aggregator::addInput)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) }
- .returns(SimResourceCommand.Consume(1.0, 1.0))
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1.0, duration = 1000))
.andThenThrows(IllegalStateException("Test Exception"))
assertThrows<IllegalStateException> { aggregator.consume(consumer) }
@@ -152,7 +151,7 @@ internal class SimResourceAggregatorMaxMinTest {
sources[0].capacity = 0.5
}
yield()
- assertEquals(2334, clock.millis())
+ assertEquals(2333, clock.millis())
}
@Test
@@ -173,7 +172,7 @@ internal class SimResourceAggregatorMaxMinTest {
sources[0].capacity = 0.5
}
yield()
- assertEquals(1000, clock.millis())
+ assertEquals(1167, clock.millis())
}
@Test
@@ -188,8 +187,8 @@ internal class SimResourceAggregatorMaxMinTest {
sources.forEach(aggregator::addInput)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) }
- .returns(SimResourceCommand.Consume(4.0, 4.0, 1000))
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(4.0, 1000))
.andThen(SimResourceCommand.Exit)
aggregator.consume(consumer)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
index 02d456ff..9a52dc63 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
@@ -31,44 +31,23 @@ import org.junit.jupiter.api.assertThrows
*/
class SimResourceCommandTest {
@Test
- fun testZeroWork() {
- assertThrows<IllegalArgumentException> {
- SimResourceCommand.Consume(0.0, 1.0)
- }
- }
-
- @Test
- fun testNegativeWork() {
- assertThrows<IllegalArgumentException> {
- SimResourceCommand.Consume(-1.0, 1.0)
- }
- }
-
- @Test
- fun testZeroLimit() {
+ fun testNegativeLimit() {
assertThrows<IllegalArgumentException> {
- SimResourceCommand.Consume(1.0, 0.0)
+ SimResourceCommand.Consume(-1.0, 1)
}
}
@Test
- fun testNegativeLimit() {
+ fun testNegativeDuration() {
assertThrows<IllegalArgumentException> {
- SimResourceCommand.Consume(1.0, -1.0, 1)
+ SimResourceCommand.Consume(1.0, -1)
}
}
@Test
fun testConsumeCorrect() {
assertDoesNotThrow {
- SimResourceCommand.Consume(1.0, 1.0)
- }
- }
-
- @Test
- fun testIdleCorrect() {
- assertDoesNotThrow {
- SimResourceCommand.Idle(1)
+ SimResourceCommand.Consume(1.0)
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index 6cb507ce..0cb95abb 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -32,19 +32,14 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* A test suite for the [SimResourceContextImpl] class.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
class SimResourceContextTest {
@Test
fun testFlushWithoutCommand() = runBlockingSimulation {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit
- val logic = object : SimResourceProviderLogic {
- override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
- override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
- override fun onFinish(ctx: SimResourceControllableContext) {}
- }
+ val logic = object : SimResourceProviderLogic {}
val context = SimResourceContextImpl(null, interpreter, consumer, logic)
context.doUpdate(interpreter.clock.millis())
@@ -54,12 +49,11 @@ class SimResourceContextTest {
fun testIntermediateFlush() = runBlockingSimulation {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit
val logic = spyk(object : SimResourceProviderLogic {
- override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
override fun onFinish(ctx: SimResourceControllableContext) {}
- override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
+ override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long = duration
})
val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic))
@@ -74,13 +68,9 @@ class SimResourceContextTest {
fun testIntermediateFlushIdle() = runBlockingSimulation {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) andThen SimResourceCommand.Exit
- val logic = spyk(object : SimResourceProviderLogic {
- override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
- override fun onFinish(ctx: SimResourceControllableContext) {}
- override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
- })
+ val logic = spyk(object : SimResourceProviderLogic {})
val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic))
context.start()
@@ -90,7 +80,7 @@ class SimResourceContextTest {
context.invalidate()
assertAll(
- { verify(exactly = 2) { logic.onIdle(any(), any()) } },
+ { verify(exactly = 2) { logic.onConsume(any(), any(), 0.0, any()) } },
{ verify(exactly = 1) { logic.onFinish(any()) } }
)
}
@@ -99,13 +89,9 @@ class SimResourceContextTest {
fun testDoubleStart() = runBlockingSimulation {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) andThen SimResourceCommand.Exit
- val logic = object : SimResourceProviderLogic {
- override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
- override fun onFinish(ctx: SimResourceControllableContext) {}
- override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
- }
+ val logic = object : SimResourceProviderLogic {}
val context = SimResourceContextImpl(null, interpreter, consumer, logic)
context.start()
@@ -116,16 +102,12 @@ class SimResourceContextTest {
}
@Test
- fun testIdempodentCapacityChange() = runBlockingSimulation {
+ fun testIdempotentCapacityChange() = runBlockingSimulation {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit
- val logic = object : SimResourceProviderLogic {
- override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
- override fun onFinish(ctx: SimResourceControllableContext) {}
- override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
- }
+ val logic = object : SimResourceProviderLogic {}
val context = SimResourceContextImpl(null, interpreter, consumer, logic)
context.capacity = 4200.0
@@ -139,15 +121,11 @@ class SimResourceContextTest {
fun testFailureNoInfiniteLoop() = runBlockingSimulation {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) } returns SimResourceCommand.Exit
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit
every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent")
every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure")
- val logic = spyk(object : SimResourceProviderLogic {
- override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
- override fun onFinish(ctx: SimResourceControllableContext) {}
- override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
- })
+ val logic = spyk(object : SimResourceProviderLogic {})
val context = SimResourceContextImpl(null, interpreter, consumer, logic)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 4895544d..c310fad6 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -37,8 +37,7 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* A test suite for the [SimResourceSource] class.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
-class SimResourceSourceTest {
+internal class SimResourceSourceTest {
@Test
fun testSpeed() = runBlockingSimulation {
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
@@ -46,8 +45,8 @@ class SimResourceSourceTest {
val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) }
- .returns(SimResourceCommand.Consume(1000 * capacity, capacity))
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(capacity, duration = 1000))
.andThen(SimResourceCommand.Exit)
val res = mutableListOf<Double>()
@@ -81,8 +80,8 @@ class SimResourceSourceTest {
val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) }
- .returns(SimResourceCommand.Consume(1000 * capacity, 2 * capacity))
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(2 * capacity, duration = 1000))
.andThen(SimResourceCommand.Exit)
val res = mutableListOf<Double>()
@@ -104,7 +103,7 @@ class SimResourceSourceTest {
val provider = SimResourceSource(capacity, scheduler)
val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
return SimResourceCommand.Exit
}
@@ -133,11 +132,10 @@ class SimResourceSourceTest {
}
}
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- assertEquals(0.0, ctx.remainingWork)
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
return if (isFirst) {
isFirst = false
- SimResourceCommand.Consume(4.0, 1.0)
+ SimResourceCommand.Consume(1.0, duration = 4000)
} else {
SimResourceCommand.Exit
}
@@ -175,8 +173,8 @@ class SimResourceSourceTest {
val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) }
- .returns(SimResourceCommand.Consume(1.0, 1.0))
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1.0, duration = 1000))
.andThenThrows(IllegalStateException())
assertThrows<IllegalStateException> {
@@ -191,8 +189,8 @@ class SimResourceSourceTest {
val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) }
- .returns(SimResourceCommand.Consume(1.0, 1.0))
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1.0))
.andThenThrows(IllegalStateException())
assertThrows<IllegalStateException> {
@@ -210,8 +208,8 @@ class SimResourceSourceTest {
val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) }
- .returns(SimResourceCommand.Consume(1.0, 1.0))
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(1.0))
.andThenThrows(IllegalStateException())
launch { provider.consume(consumer) }
@@ -228,8 +226,8 @@ class SimResourceSourceTest {
val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) }
- .returns(SimResourceCommand.Idle(clock.millis() + 500))
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(0.0, 500))
.andThen(SimResourceCommand.Exit)
provider.consume(consumer)
@@ -246,28 +244,12 @@ class SimResourceSourceTest {
val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) }
- .returns(SimResourceCommand.Idle())
+ every { consumer.onNext(any(), any(), any()) }
+ .returns(SimResourceCommand.Consume(0.0))
.andThenThrows(IllegalStateException())
provider.consume(consumer)
}
}
}
-
- @Test
- fun testIncorrectDeadline() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
-
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) }
- .returns(SimResourceCommand.Idle(2))
- .andThen(SimResourceCommand.Exit)
-
- delay(10)
-
- assertThrows<IllegalArgumentException> { provider.consume(consumer) }
- }
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index ad8d82e3..ad3b0f9f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -24,7 +24,6 @@ package org.opendc.simulator.resources
import io.mockk.every
import io.mockk.mockk
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.yield
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@@ -38,7 +37,6 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* Test suite for the [SimResourceSwitchExclusive] class.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class SimResourceSwitchExclusiveTest {
/**
* Test a trace workload.
@@ -91,7 +89,7 @@ internal class SimResourceSwitchExclusiveTest {
val duration = 5 * 60L * 1000
val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
+ every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, duration = duration) andThen SimResourceCommand.Exit
val switch = SimResourceSwitchExclusive()
val source = SimResourceSource(3200.0, scheduler)
@@ -127,10 +125,10 @@ internal class SimResourceSwitchExclusiveTest {
}
}
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
return if (isFirst) {
isFirst = false
- SimResourceCommand.Consume(duration / 1000.0, 1.0)
+ SimResourceCommand.Consume(1.0, duration = duration)
} else {
SimResourceCommand.Exit
}
@@ -161,9 +159,8 @@ internal class SimResourceSwitchExclusiveTest {
fun testConcurrentWorkloadFails() = runBlockingSimulation {
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val duration = 5 * 60L * 1000
val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
+ every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit
val switch = SimResourceSwitchExclusive()
val source = SimResourceSource(3200.0, scheduler)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
index e4292ec0..d8f18e65 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -24,7 +24,6 @@ package org.opendc.simulator.resources
import io.mockk.every
import io.mockk.mockk
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
@@ -37,7 +36,6 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* Test suite for the [SimResourceSwitch] implementations
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class SimResourceSwitchMaxMinTest {
@Test
fun testSmoke() = runBlockingSimulation {
@@ -50,7 +48,7 @@ internal class SimResourceSwitchMaxMinTest {
val provider = switch.newOutput()
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, duration = 1000) andThen SimResourceCommand.Exit
try {
provider.consume(consumer)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
index cf69b7b5..3780fd60 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -37,7 +37,6 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* A test suite for the [SimResourceTransformer] class.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class SimResourceTransformerTest {
@Test
fun testCancelImmediately() = runBlockingSimulation {
@@ -48,7 +47,7 @@ internal class SimResourceTransformerTest {
launch { source.consume(forwarder) }
forwarder.consume(object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
return SimResourceCommand.Exit
}
})
@@ -68,10 +67,10 @@ internal class SimResourceTransformerTest {
forwarder.consume(object : SimResourceConsumer {
var isFirst = true
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
return if (isFirst) {
isFirst = false
- SimResourceCommand.Consume(10.0, 1.0)
+ SimResourceCommand.Consume(1.0, duration = 10 * 1000L)
} else {
SimResourceCommand.Exit
}
@@ -86,7 +85,7 @@ internal class SimResourceTransformerTest {
fun testState() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand = SimResourceCommand.Exit
}
assertFalse(forwarder.isActive)
@@ -108,7 +107,7 @@ internal class SimResourceTransformerTest {
val forwarder = SimResourceForwarder()
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) } returns SimResourceCommand.Exit
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit
forwarder.startConsumer(consumer)
forwarder.cancel()
@@ -123,7 +122,7 @@ internal class SimResourceTransformerTest {
val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10)
source.startConsumer(forwarder)
yield()
@@ -142,7 +141,7 @@ internal class SimResourceTransformerTest {
val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10)
source.startConsumer(forwarder)
yield()
@@ -161,7 +160,7 @@ internal class SimResourceTransformerTest {
val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) } returns SimResourceCommand.Exit
+ every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit
source.startConsumer(forwarder)
forwarder.consume(consumer)
@@ -200,7 +199,7 @@ internal class SimResourceTransformerTest {
forwarder.consume(consumer)
assertEquals(0, clock.millis())
- verify(exactly = 1) { consumer.onNext(any()) }
+ verify(exactly = 1) { consumer.onNext(any(), any(), any()) }
}
@Test
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
index 42648cf1..830f16d3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.opendc.simulator.core.runBlockingSimulation
@@ -32,7 +31,6 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* A test suite for the [SimWorkConsumer] class.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class SimWorkConsumerTest {
@Test
fun testSmoke() = runBlockingSimulation {
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index 728dfd99..992b4991 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -126,7 +126,7 @@ internal class WorkflowServiceTest {
{ assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") },
{ assertEquals(0, metrics.tasksActive, "Not all started tasks finished") },
{ assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
- { assertEquals(33213237L, clock.millis()) }
+ { assertEquals(33213236L, clock.millis()) }
)
}