summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt11
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt21
-rw-r--r--opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt2
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt15
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt15
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt53
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt8
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt46
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt7
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt14
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt7
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt18
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt109
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt3
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt9
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt11
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt123
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt55
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt53
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt90
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt73
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt17
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt7
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt44
25 files changed, 392 insertions, 429 deletions
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index bfc5fc6f..1fbd3b6a 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -128,7 +128,7 @@ public class SimTFDevice(
}
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
val consumedWork = ctx.speed * delta / 1000.0
val activeWork = activeWork
@@ -137,7 +137,8 @@ public class SimTFDevice(
this.activeWork = null
} else {
val duration = (activeWork.flops / ctx.capacity * 1000).roundToLong()
- return SimResourceCommand.Consume(ctx.capacity, duration)
+ ctx.push(ctx.capacity)
+ return duration
}
}
@@ -146,9 +147,11 @@ public class SimTFDevice(
return if (head != null) {
this.activeWork = head
val duration = (head.flops / ctx.capacity * 1000).roundToLong()
- SimResourceCommand.Consume(ctx.capacity, duration)
+ ctx.push(ctx.capacity)
+ duration
} else {
- SimResourceCommand.Consume(0.0)
+ ctx.push(0.0)
+ Long.MAX_VALUE
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
index 34ac4418..6e6e590f 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
@@ -24,7 +24,6 @@ package org.opendc.simulator.compute.device
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.power.SimPowerInlet
-import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
import org.opendc.simulator.resources.SimResourceEvent
@@ -83,13 +82,10 @@ public class SimPsu(
}
override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0)
-
- return if (powerDraw > 0.0)
- SimResourceCommand.Consume(powerDraw, Long.MAX_VALUE)
- else
- SimResourceCommand.Consume(0.0)
+ ctx.push(powerDraw)
+ return Long.MAX_VALUE
}
override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 527619bd..dd582bb2 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -24,7 +24,6 @@ package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
import kotlin.math.min
@@ -80,13 +79,20 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val
}
private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
- val fragment = pullFragment(now) ?: return SimResourceCommand.Exit
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ val fragment = pullFragment(now)
+
+ if (fragment == null) {
+ ctx.close()
+ return Long.MAX_VALUE
+ }
+
val timestamp = fragment.timestamp + offset
// Fragment is in the future
if (timestamp > now) {
- return SimResourceCommand.Consume(0.0, timestamp - now)
+ ctx.push(0.0)
+ return timestamp - now
}
val cores = min(cpu.node.coreCount, fragment.cores)
@@ -97,10 +103,9 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val
val deadline = timestamp + fragment.duration
val duration = deadline - now
- return if (cpu.id < cores && usage > 0.0)
- SimResourceCommand.Consume(usage, duration)
- else
- SimResourceCommand.Consume(0.0, duration)
+ ctx.push(if (cpu.id < cores && usage > 0.0) usage else 0.0)
+
+ return duration
}
}
diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
index f2dd8455..7db0f176 100644
--- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
+++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
@@ -32,7 +32,7 @@ public class SimNetworkSink(
public val capacity: Double
) : SimNetworkPort() {
override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand = SimResourceCommand.Consume(0.0)
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long = Long.MAX_VALUE
override fun toString(): String = "SimNetworkSink.Consumer"
}
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
index e8839496..b0ea7f0a 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
@@ -47,16 +47,13 @@ public class SimPdu(
public fun newOutlet(): Outlet = Outlet(distributor.newOutput())
override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer by distributor {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
- return when (val cmd = distributor.onNext(ctx, now, delta)) {
- is SimResourceCommand.Consume -> {
- val loss = computePowerLoss(cmd.limit)
- val newLimit = cmd.limit + loss
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ val duration = distributor.onNext(ctx, now, delta)
+ val loss = computePowerLoss(ctx.demand)
+ val newLimit = ctx.demand + loss
- SimResourceCommand.Consume(newLimit, cmd.duration)
- }
- else -> cmd
- }
+ ctx.push(newLimit)
+ return duration
}
override fun toString(): String = "SimPdu.Consumer"
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
index 4c2beb68..59006dfc 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
@@ -55,16 +55,13 @@ public class SimUps(
override fun onConnect(inlet: SimPowerInlet) {
val consumer = inlet.createConsumer()
aggregator.startConsumer(object : SimResourceConsumer by consumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
- return when (val cmd = consumer.onNext(ctx, now, delta)) {
- is SimResourceCommand.Consume -> {
- val loss = computePowerLoss(cmd.limit)
- val newLimit = cmd.limit + loss
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ val duration = consumer.onNext(ctx, now, delta)
+ val loss = computePowerLoss(ctx.demand)
+ val newLimit = ctx.demand + loss
- SimResourceCommand.Consume(newLimit, cmd.duration)
- }
- else -> cmd
- }
+ ctx.push(newLimit)
+ return duration
}
})
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index da5c3257..8e0eb5f8 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -125,16 +125,21 @@ public abstract class SimAbstractResourceAggregator(
/**
* An input for the resource aggregator.
*/
- public interface Input {
+ public interface Input : AutoCloseable {
/**
* The [SimResourceContext] associated with the input.
*/
public val ctx: SimResourceContext
/**
- * Push the specified [SimResourceCommand] to the input.
+ * Push to this input with the specified [limit] and [duration].
*/
- public fun push(command: SimResourceCommand)
+ public fun push(limit: Double, duration: Long)
+
+ /**
+ * Close the input for further input.
+ */
+ public override fun close()
}
/**
@@ -151,7 +156,12 @@ public abstract class SimAbstractResourceAggregator(
/**
* The resource command to run next.
*/
- private var command: SimResourceCommand? = null
+ private var _duration: Long = Long.MAX_VALUE
+
+ /**
+ * A flag to indicate that the consumer should flush.
+ */
+ private var _isPushed = false
private fun updateCapacity() {
// Adjust capacity of output resource
@@ -159,25 +169,34 @@ public abstract class SimAbstractResourceAggregator(
}
/* Input */
- override fun push(command: SimResourceCommand) {
- this.command = command
- _ctx?.interrupt()
+ override fun push(limit: Double, duration: Long) {
+ _duration = duration
+ val ctx = _ctx
+ if (ctx != null) {
+ ctx.push(limit)
+ ctx.interrupt()
+ }
+ _isPushed = true
+ }
+
+ override fun close() {
+ _duration = Long.MAX_VALUE
+ _isPushed = true
+ _ctx?.close()
}
/* SimResourceConsumer */
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
- var next = command
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ var next = _duration
- return if (next != null) {
- this.command = null
- next
- } else {
+ if (!_isPushed) {
_output.flush()
-
- next = command
- this.command = null
- next ?: SimResourceCommand.Consume(0.0)
+ next = _duration
}
+
+ _isPushed = false
+ _duration = Long.MAX_VALUE
+ return next
}
override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
index 537be1b5..b258a368 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -41,11 +41,7 @@ public class SimResourceAggregatorMaxMin(
val fraction = inputCapacity / capacity
val grantedSpeed = limit * fraction
- val command = if (grantedSpeed > 0.0)
- SimResourceCommand.Consume(grantedSpeed, duration)
- else
- SimResourceCommand.Consume(0.0, duration)
- input.push(command)
+ input.push(grantedSpeed, duration)
}
}
@@ -53,7 +49,7 @@ public class SimResourceAggregatorMaxMin(
val iterator = consumers.iterator()
for (input in iterator) {
iterator.remove()
- input.push(SimResourceCommand.Exit)
+ input.close()
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
deleted file mode 100644
index 4a980071..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-/**
- * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer].
- */
-public sealed class SimResourceCommand {
- /**
- * A request to the resource to perform work for the specified [duration].
- *
- * @param limit The maximum amount of work to be processed per second.
- * @param duration The duration of the resource consumption in milliseconds.
- */
- public data class Consume(val limit: Double, val duration: Long = Long.MAX_VALUE) : SimResourceCommand() {
- init {
- require(limit >= 0.0) { "Negative limit is not allowed" }
- require(duration >= 0) { "Duration must be positive" }
- }
- }
-
- /**
- * An indication to the resource that the consumer has finished.
- */
- public object Exit : SimResourceCommand()
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
index 4d1d2c32..0b25358a 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -30,15 +30,14 @@ package org.opendc.simulator.resources
*/
public interface SimResourceConsumer {
/**
- * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because
- * the resource finished processing, reached its deadline or was interrupted.
+ * This method is invoked when the resource provider is pulling this resource consumer.
*
* @param ctx The execution context in which the consumer runs.
* @param now The virtual timestamp in milliseconds at which the update is occurring.
* @param delta The virtual duration between this call and the last call in milliseconds.
- * @return The next command that the resource should execute.
+ * @return The duration after which the resource consumer should be pulled again.
*/
- public fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand
+ public fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long
/**
* This method is invoked when an event has occurred.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
index f28b43d0..225cae0b 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -28,7 +28,7 @@ import java.time.Clock
* The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a
* resource and a resource consumer.
*/
-public interface SimResourceContext {
+public interface SimResourceContext : AutoCloseable {
/**
* The virtual clock tracking simulation time.
*/
@@ -53,4 +53,16 @@ public interface SimResourceContext {
* Ask the resource provider to interrupt its resource.
*/
public fun interrupt()
+
+ /**
+ * Push the given flow to this context.
+ *
+ * @param rate The rate of the flow to push.
+ */
+ public fun push(rate: Double)
+
+ /**
+ * Stop the resource context.
+ */
+ public override fun close()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt
index ceaca39a..ba52b597 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt
@@ -27,7 +27,7 @@ package org.opendc.simulator.resources
*
* This interface is used by resource providers to control the resource context.
*/
-public interface SimResourceControllableContext : SimResourceContext, AutoCloseable {
+public interface SimResourceControllableContext : SimResourceContext {
/**
* The state of the resource context.
*/
@@ -44,11 +44,6 @@ public interface SimResourceControllableContext : SimResourceContext, AutoClosea
public fun start()
/**
- * Stop the resource context.
- */
- public override fun close()
-
- /**
* Invalidate the resource context's state.
*
* By invalidating the resource context's current state, the state is re-computed and the current progress is
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index d23c7dbb..eac58410 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -54,11 +54,6 @@ public class SimResourceDistributorMaxMin(
private val activeOutputs: MutableList<Output> = mutableListOf()
/**
- * The total amount of work allocated to be executed.
- */
- private var totalAllocatedWork = 0.0
-
- /**
* The total allocated speed for the output resources.
*/
private var totalAllocatedSpeed = 0.0
@@ -97,7 +92,7 @@ public class SimResourceDistributorMaxMin(
}
/* SimResourceConsumer */
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
return doNext(ctx, now)
}
@@ -137,10 +132,10 @@ public class SimResourceDistributorMaxMin(
/**
* Schedule the work of the outputs.
*/
- private fun doNext(ctx: SimResourceContext, now: Long): SimResourceCommand {
+ private fun doNext(ctx: SimResourceContext, now: Long): Long {
// If there is no work yet, mark the input as idle.
if (activeOutputs.isEmpty()) {
- return SimResourceCommand.Consume(0.0)
+ return Long.MAX_VALUE
}
val capacity = ctx.capacity
@@ -196,14 +191,11 @@ public class SimResourceDistributorMaxMin(
}
this.totalRequestedSpeed = totalRequestedSpeed
- this.totalAllocatedWork = totalAllocatedWork
val totalAllocatedSpeed = capacity - availableSpeed
this.totalAllocatedSpeed = totalAllocatedSpeed
- return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0)
- SimResourceCommand.Consume(totalAllocatedSpeed, duration)
- else
- SimResourceCommand.Consume(0.0, duration)
+ ctx.push(totalAllocatedSpeed)
+ return duration
}
private fun updateCapacity(ctx: SimResourceContext) {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index 68bedbd9..f12ef9f1 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -23,6 +23,7 @@
package org.opendc.simulator.resources
import org.opendc.simulator.resources.impl.SimResourceCountersImpl
+import java.time.Clock
/**
* A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider.
@@ -32,14 +33,9 @@ import org.opendc.simulator.resources.impl.SimResourceCountersImpl
*/
public class SimResourceTransformer(
private val isCoupled: Boolean = false,
- private val transform: (SimResourceContext, SimResourceCommand) -> SimResourceCommand
+ private val transform: (SimResourceContext, Long) -> Long
) : SimResourceFlow, AutoCloseable {
/**
- * The [SimResourceContext] in which the forwarder runs.
- */
- private var ctx: SimResourceContext? = null
-
- /**
* The delegate [SimResourceConsumer].
*/
private var delegate: SimResourceConsumer? = null
@@ -49,17 +45,63 @@ public class SimResourceTransformer(
*/
private var hasDelegateStarted: Boolean = false
+ /**
+ * The exposed [SimResourceContext].
+ */
+ private val ctx = object : SimResourceContext {
+ override val clock: Clock
+ get() = _ctx!!.clock
+
+ override val capacity: Double
+ get() = _ctx?.capacity ?: 0.0
+
+ override val demand: Double
+ get() = _ctx?.demand ?: 0.0
+
+ override val speed: Double
+ get() = _ctx?.speed ?: 0.0
+
+ override fun interrupt() {
+ _ctx?.interrupt()
+ }
+
+ override fun push(rate: Double) {
+ _ctx?.push(rate)
+ _limit = rate
+ }
+
+ override fun close() {
+ val delegate = checkNotNull(delegate) { "Delegate not active" }
+
+ // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
+ // reset beforehand the existing state and check whether it has been updated afterwards
+ reset()
+
+ delegate.onEvent(this, SimResourceEvent.Exit)
+
+ if (isCoupled)
+ _ctx?.close()
+ else
+ _ctx?.push(0.0)
+ }
+ }
+
+ /**
+ * The [SimResourceContext] in which the forwarder runs.
+ */
+ private var _ctx: SimResourceContext? = null
+
override val isActive: Boolean
get() = delegate != null
override val capacity: Double
- get() = ctx?.capacity ?: 0.0
+ get() = ctx.capacity
override val speed: Double
- get() = ctx?.speed ?: 0.0
+ get() = ctx.speed
override val demand: Double
- get() = ctx?.demand ?: 0.0
+ get() = ctx.demand
override val counters: SimResourceCounters
get() = _counters
@@ -75,32 +117,32 @@ public class SimResourceTransformer(
}
override fun interrupt() {
- ctx?.interrupt()
+ ctx.interrupt()
}
override fun cancel() {
val delegate = delegate
- val ctx = ctx
+ val ctx = _ctx
if (delegate != null) {
this.delegate = null
if (ctx != null) {
- delegate.onEvent(ctx, SimResourceEvent.Exit)
+ delegate.onEvent(this.ctx, SimResourceEvent.Exit)
}
}
}
override fun close() {
- val ctx = ctx
+ val ctx = _ctx
if (ctx != null) {
- this.ctx = null
+ this._ctx = null
ctx.interrupt()
}
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
val delegate = delegate
if (!hasDelegateStarted) {
@@ -110,54 +152,39 @@ public class SimResourceTransformer(
updateCounters(ctx, delta)
return if (delegate != null) {
- val command = transform(ctx, delegate.onNext(ctx, now, delta))
-
- _limit = if (command is SimResourceCommand.Consume) command.limit else 0.0
-
- if (command == SimResourceCommand.Exit) {
- // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
- // reset beforehand the existing state and check whether it has been updated afterwards
- reset()
-
- delegate.onEvent(ctx, SimResourceEvent.Exit)
-
- if (isCoupled)
- SimResourceCommand.Exit
- else
- onNext(ctx, now, delta)
- } else {
- command
- }
+ val duration = transform(ctx, delegate.onNext(this.ctx, now, delta))
+ _limit = ctx.demand
+ duration
} else {
- SimResourceCommand.Consume(0.0)
+ Long.MAX_VALUE
}
}
override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
when (event) {
SimResourceEvent.Start -> {
- this.ctx = ctx
+ _ctx = ctx
}
SimResourceEvent.Exit -> {
- this.ctx = null
+ _ctx = null
val delegate = delegate
if (delegate != null) {
reset()
- delegate.onEvent(ctx, SimResourceEvent.Exit)
+ delegate.onEvent(this.ctx, SimResourceEvent.Exit)
}
}
- else -> delegate?.onEvent(ctx, event)
+ else -> delegate?.onEvent(this.ctx, event)
}
}
override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
- this.ctx = null
+ _ctx = null
val delegate = delegate
if (delegate != null) {
reset()
- delegate.onFailure(ctx, cause)
+ delegate.onFailure(this.ctx, cause)
}
}
@@ -166,7 +193,7 @@ public class SimResourceTransformer(
*/
private fun start() {
val delegate = delegate ?: return
- delegate.onEvent(checkNotNull(ctx), SimResourceEvent.Start)
+ delegate.onEvent(checkNotNull(_ctx), SimResourceEvent.Start)
hasDelegateStarted = true
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
index 1f8434b7..46885640 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources.consumer
-import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
import org.opendc.simulator.resources.SimResourceEvent
@@ -50,7 +49,7 @@ public class SimSpeedConsumerAdapter(
callback(0.0)
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
return delegate.onNext(ctx, now, delta)
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
index e5173e5f..ad6b0108 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources.consumer
-import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
import org.opendc.simulator.resources.SimResourceEvent
@@ -34,13 +33,15 @@ import org.opendc.simulator.resources.SimResourceEvent
public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer {
private var iterator: Iterator<Fragment>? = null
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
val iterator = checkNotNull(iterator)
return if (iterator.hasNext()) {
val fragment = iterator.next()
- SimResourceCommand.Consume(fragment.usage, fragment.duration)
+ ctx.push(fragment.usage)
+ fragment.duration
} else {
- SimResourceCommand.Exit
+ ctx.close()
+ Long.MAX_VALUE
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
index ae837043..bf76711f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources.consumer
-import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
import kotlin.math.roundToLong
@@ -37,12 +36,12 @@ public class SimWorkConsumer(
init {
require(work >= 0.0) { "Work must be positive" }
- require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
+ require(utilization > 0.0) { "Utilization must be positive" }
}
private var remainingWork = work
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
val actualWork = ctx.speed * delta / 1000.0
val limit = ctx.capacity * utilization
@@ -52,9 +51,11 @@ public class SimWorkConsumer(
val duration = (remainingWork / limit * 1000).roundToLong()
return if (duration > 0) {
- SimResourceCommand.Consume(limit, duration)
+ ctx.push(limit)
+ duration
} else {
- SimResourceCommand.Exit
+ ctx.close()
+ Long.MAX_VALUE
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
index a9507e52..d7ea0043 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -86,6 +86,11 @@ internal class SimResourceContextImpl(
private var _deadline: Long = Long.MAX_VALUE
/**
+ * A flag to indicate that an update is active.
+ */
+ private var _updateActive = false
+
+ /**
* The update flag indicating why the update was triggered.
*/
private var _flag: Int = 0
@@ -108,7 +113,9 @@ internal class SimResourceContextImpl(
if (_state != SimResourceState.Stopped) {
interpreter.batch {
_state = SimResourceState.Stopped
- doStop()
+ if (!_updateActive) {
+ doStop()
+ }
}
}
}
@@ -139,6 +146,11 @@ internal class SimResourceContextImpl(
interpreter.scheduleSync(this)
}
+ override fun push(rate: Double) {
+ _speed = min(capacity, rate)
+ _limit = rate
+ }
+
/**
* Determine whether the state of the resource context should be updated.
*/
@@ -151,14 +163,49 @@ internal class SimResourceContextImpl(
* Update the state of the resource context.
*/
fun doUpdate(timestamp: Long) {
+ val oldState = _state
+ if (oldState != SimResourceState.Active) {
+ return
+ }
+
+ _updateActive = true
+
+ val flag = _flag
+ val isInterrupted = flag and FLAG_INTERRUPT != 0
+ val reachedDeadline = _deadline <= timestamp
+ val delta = max(0, timestamp - _timestamp)
+
try {
- val oldState = _state
- val newState = doUpdate(timestamp, oldState)
- _state = newState
+ // Update the resource counters only if there is some progress
+ if (timestamp > _timestamp) {
+ logic.onUpdate(this, delta, _limit, reachedDeadline)
+ }
+
+ // We should only continue processing the next command if:
+ // 1. The resource consumption was finished.
+ // 2. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ val duration = if (reachedDeadline || isInterrupted) {
+ consumer.onNext(this, timestamp, delta)
+ } else {
+ _deadline - timestamp
+ }
+
+ // Reset update flags
_flag = 0
- when (newState) {
+ when (_state) {
+ SimResourceState.Active -> {
+ val limit = _limit
+ push(limit)
+ _duration = duration
+
+ val target = logic.onConsume(this, timestamp, limit, duration)
+
+ _deadline = target
+
+ scheduleUpdate(target)
+ }
SimResourceState.Pending ->
if (oldState != SimResourceState.Pending) {
throw IllegalStateException("Illegal transition to pending state")
@@ -167,12 +214,12 @@ internal class SimResourceContextImpl(
if (oldState != SimResourceState.Stopped) {
doStop()
}
- else -> {}
}
} catch (cause: Throwable) {
doFail(cause)
} finally {
_timestamp = timestamp
+ _updateActive = false
}
}
@@ -185,39 +232,6 @@ internal class SimResourceContextImpl(
override fun toString(): String = "SimResourceContextImpl[capacity=$capacity]"
/**
- * Update the state of the resource context.
- */
- private fun doUpdate(timestamp: Long, state: SimResourceState): SimResourceState {
- return when (state) {
- // Resource context is not active, so its state will not update
- SimResourceState.Pending, SimResourceState.Stopped -> state
- SimResourceState.Active -> {
- val isInterrupted = _flag and FLAG_INTERRUPT != 0
- val reachedDeadline = _deadline <= timestamp
- val delta = max(0, timestamp - _timestamp)
-
- // Update the resource counters only if there is some progress
- if (timestamp > _timestamp) {
- logic.onUpdate(this, delta, _limit, reachedDeadline)
- }
-
- // We should only continue processing the next command if:
- // 1. The resource consumption was finished.
- // 2. The resource capacity cannot satisfy the demand.
- // 3. The resource consumer should be interrupted (e.g., someone called .interrupt())
- if (reachedDeadline || isInterrupted) {
- when (val command = consumer.onNext(this, timestamp, delta)) {
- is SimResourceCommand.Consume -> interpretConsume(timestamp, command.limit, command.duration)
- is SimResourceCommand.Exit -> interpretExit()
- }
- } else {
- interpretConsume(timestamp, _limit, _duration - delta)
- }
- }
- }
- }
-
- /**
* Stop the resource context.
*/
private fun doStop() {
@@ -226,6 +240,8 @@ internal class SimResourceContextImpl(
logic.onFinish(this)
} catch (cause: Throwable) {
doFail(cause)
+ } finally {
+ _deadline = Long.MAX_VALUE
}
}
@@ -244,35 +260,6 @@ internal class SimResourceContextImpl(
}
/**
- * Interpret the [SimResourceCommand.Consume] command.
- */
- private fun interpretConsume(now: Long, limit: Double, duration: Long): SimResourceState {
- _speed = min(capacity, limit)
- _limit = limit
- _duration = duration
-
- val timestamp = logic.onConsume(this, now, limit, duration)
-
- _deadline = timestamp
-
- scheduleUpdate(timestamp)
-
- return SimResourceState.Active
- }
-
- /**
- * Interpret the [SimResourceCommand.Exit] command.
- */
- private fun interpretExit(): SimResourceState {
- _speed = 0.0
- _limit = 0.0
- _duration = Long.MAX_VALUE
- _deadline = Long.MAX_VALUE
-
- return SimResourceState.Stopped
- }
-
- /**
* Indicate that the capacity of the resource has changed.
*/
private fun onCapacityChange() {
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
index a9390553..f4ea5fe8 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -22,14 +22,12 @@
package org.opendc.simulator.resources
-import io.mockk.every
-import io.mockk.mockk
+import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.*
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
@@ -100,10 +98,17 @@ internal class SimResourceAggregatorMaxMinTest {
)
sources.forEach(aggregator::addInput)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) }
- .returns(SimResourceCommand.Consume(4.0, 1000))
- .andThen(SimResourceCommand.Exit)
+ val consumer = spyk(object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ ctx.push(4.0)
+ 1000
+ } else {
+ ctx.close()
+ Long.MAX_VALUE
+ }
+ }
+ })
aggregator.consume(consumer)
yield()
@@ -113,27 +118,6 @@ internal class SimResourceAggregatorMaxMinTest {
}
@Test
- fun testException() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
-
- val aggregator = SimResourceAggregatorMaxMin(scheduler)
- val sources = listOf(
- SimResourceSource(1.0, scheduler),
- SimResourceSource(1.0, scheduler)
- )
- sources.forEach(aggregator::addInput)
-
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) }
- .returns(SimResourceCommand.Consume(1.0, duration = 1000))
- .andThenThrows(IllegalStateException("Test Exception"))
-
- assertThrows<IllegalStateException> { aggregator.consume(consumer) }
- yield()
- assertFalse(sources[0].isActive)
- }
-
- @Test
fun testAdjustCapacity() = runBlockingSimulation {
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
@@ -186,10 +170,17 @@ internal class SimResourceAggregatorMaxMinTest {
)
sources.forEach(aggregator::addInput)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) }
- .returns(SimResourceCommand.Consume(4.0, 1000))
- .andThen(SimResourceCommand.Exit)
+ val consumer = object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ ctx.push(4.0)
+ 1000
+ } else {
+ ctx.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
aggregator.consume(consumer)
yield()
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
deleted file mode 100644
index 9a52dc63..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertDoesNotThrow
-import org.junit.jupiter.api.assertThrows
-
-/**
- * Test suite for [SimResourceCommand].
- */
-class SimResourceCommandTest {
- @Test
- fun testNegativeLimit() {
- assertThrows<IllegalArgumentException> {
- SimResourceCommand.Consume(-1.0, 1)
- }
- }
-
- @Test
- fun testNegativeDuration() {
- assertThrows<IllegalArgumentException> {
- SimResourceCommand.Consume(1.0, -1)
- }
- }
-
- @Test
- fun testConsumeCorrect() {
- assertDoesNotThrow {
- SimResourceCommand.Consume(1.0)
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index 0cb95abb..4e57f598 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -36,8 +36,17 @@ class SimResourceContextTest {
@Test
fun testFlushWithoutCommand() = runBlockingSimulation {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit
+ val consumer = object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ ctx.push(1.0)
+ 1000
+ } else {
+ ctx.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
val logic = object : SimResourceProviderLogic {}
val context = SimResourceContextImpl(null, interpreter, consumer, logic)
@@ -48,14 +57,23 @@ class SimResourceContextTest {
@Test
fun testIntermediateFlush() = runBlockingSimulation {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit
+ val consumer = object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ ctx.push(4.0)
+ 1000
+ } else {
+ ctx.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
val logic = spyk(object : SimResourceProviderLogic {
override fun onFinish(ctx: SimResourceControllableContext) {}
override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long = duration
})
- val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic))
+ val context = SimResourceContextImpl(null, interpreter, consumer, logic)
context.start()
delay(1) // Delay 1 ms to prevent hitting the fast path
@@ -67,11 +85,20 @@ class SimResourceContextTest {
@Test
fun testIntermediateFlushIdle() = runBlockingSimulation {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) andThen SimResourceCommand.Exit
+ val consumer = object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ ctx.push(0.0)
+ 10
+ } else {
+ ctx.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
val logic = spyk(object : SimResourceProviderLogic {})
- val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic))
+ val context = SimResourceContextImpl(null, interpreter, consumer, logic)
context.start()
delay(5)
@@ -88,8 +115,17 @@ class SimResourceContextTest {
@Test
fun testDoubleStart() = runBlockingSimulation {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) andThen SimResourceCommand.Exit
+ val consumer = object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ ctx.push(0.0)
+ 1000
+ } else {
+ ctx.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
val logic = object : SimResourceProviderLogic {}
val context = SimResourceContextImpl(null, interpreter, consumer, logic)
@@ -104,8 +140,17 @@ class SimResourceContextTest {
@Test
fun testIdempotentCapacityChange() = runBlockingSimulation {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit
+ val consumer = spyk(object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ ctx.push(1.0)
+ 1000
+ } else {
+ ctx.close()
+ Long.MAX_VALUE
+ }
+ }
+ })
val logic = object : SimResourceProviderLogic {}
@@ -120,12 +165,23 @@ class SimResourceContextTest {
@Test
fun testFailureNoInfiniteLoop() = runBlockingSimulation {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit
- every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent")
- every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure")
- val logic = spyk(object : SimResourceProviderLogic {})
+ val consumer = spyk(object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ ctx.close()
+ return Long.MAX_VALUE
+ }
+
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ if (event == SimResourceEvent.Exit) throw IllegalStateException("onEvent")
+ }
+
+ override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
+ throw IllegalStateException("onFailure")
+ }
+ })
+
+ val logic = object : SimResourceProviderLogic {}
val context = SimResourceContextImpl(null, interpreter, consumer, logic)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index c310fad6..e055daf7 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -44,10 +44,7 @@ internal class SimResourceSourceTest {
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) }
- .returns(SimResourceCommand.Consume(capacity, duration = 1000))
- .andThen(SimResourceCommand.Exit)
+ val consumer = SimWorkConsumer(4200.0, 1.0)
val res = mutableListOf<Double>()
val adapter = SimSpeedConsumerAdapter(consumer, res::add)
@@ -79,10 +76,7 @@ internal class SimResourceSourceTest {
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) }
- .returns(SimResourceCommand.Consume(2 * capacity, duration = 1000))
- .andThen(SimResourceCommand.Exit)
+ val consumer = SimWorkConsumer(capacity, 2.0)
val res = mutableListOf<Double>()
val adapter = SimSpeedConsumerAdapter(consumer, res::add)
@@ -103,8 +97,9 @@ internal class SimResourceSourceTest {
val provider = SimResourceSource(capacity, scheduler)
val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
- return SimResourceCommand.Exit
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ ctx.close()
+ return Long.MAX_VALUE
}
override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
@@ -132,12 +127,14 @@ internal class SimResourceSourceTest {
}
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
return if (isFirst) {
isFirst = false
- SimResourceCommand.Consume(1.0, duration = 4000)
+ ctx.push(1.0)
+ 4000
} else {
- SimResourceCommand.Exit
+ ctx.close()
+ Long.MAX_VALUE
}
}
}
@@ -172,10 +169,19 @@ internal class SimResourceSourceTest {
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) }
- .returns(SimResourceCommand.Consume(1.0, duration = 1000))
- .andThenThrows(IllegalStateException())
+ val consumer = object : SimResourceConsumer {
+ var isFirst = true
+
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ return if (isFirst) {
+ isFirst = false
+ ctx.push(1.0)
+ 1000
+ } else {
+ throw IllegalStateException()
+ }
+ }
+ }
assertThrows<IllegalStateException> {
provider.consume(consumer)
@@ -188,10 +194,7 @@ internal class SimResourceSourceTest {
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) }
- .returns(SimResourceCommand.Consume(1.0))
- .andThenThrows(IllegalStateException())
+ val consumer = SimWorkConsumer(capacity, 1.0)
assertThrows<IllegalStateException> {
coroutineScope {
@@ -207,30 +210,13 @@ internal class SimResourceSourceTest {
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) }
- .returns(SimResourceCommand.Consume(1.0))
- .andThenThrows(IllegalStateException())
+ val consumer = SimWorkConsumer(capacity, 1.0)
launch { provider.consume(consumer) }
delay(500)
provider.cancel()
- assertEquals(500, clock.millis())
- }
-
- @Test
- fun testIdle() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
-
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) }
- .returns(SimResourceCommand.Consume(0.0, 500))
- .andThen(SimResourceCommand.Exit)
-
- provider.consume(consumer)
+ yield()
assertEquals(500, clock.millis())
}
@@ -243,10 +229,9 @@ internal class SimResourceSourceTest {
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) }
- .returns(SimResourceCommand.Consume(0.0))
- .andThenThrows(IllegalStateException())
+ val consumer = object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long = Long.MAX_VALUE
+ }
provider.consume(consumer)
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index ad3b0f9f..9f86dc0d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.resources
-import io.mockk.every
-import io.mockk.mockk
import kotlinx.coroutines.yield
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@@ -32,6 +30,7 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
@@ -88,8 +87,7 @@ internal class SimResourceSwitchExclusiveTest {
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val duration = 5 * 60L * 1000
- val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, duration = duration) andThen SimResourceCommand.Exit
+ val workload = SimWorkConsumer(duration * 3.2, 1.0)
val switch = SimResourceSwitchExclusive()
val source = SimResourceSource(3200.0, scheduler)
@@ -125,12 +123,14 @@ internal class SimResourceSwitchExclusiveTest {
}
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
return if (isFirst) {
isFirst = false
- SimResourceCommand.Consume(1.0, duration = duration)
+ ctx.push(1.0)
+ duration
} else {
- SimResourceCommand.Exit
+ ctx.close()
+ Long.MAX_VALUE
}
}
}
@@ -159,9 +159,6 @@ internal class SimResourceSwitchExclusiveTest {
fun testConcurrentWorkloadFails() = runBlockingSimulation {
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit
-
val switch = SimResourceSwitchExclusive()
val source = SimResourceSource(3200.0, scheduler)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
index d8f18e65..ba0d66ff 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.resources
-import io.mockk.every
-import io.mockk.mockk
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
@@ -31,6 +29,7 @@ import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
@@ -46,9 +45,7 @@ internal class SimResourceSwitchMaxMinTest {
sources.forEach { switch.addInput(it) }
val provider = switch.newOutput()
-
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, duration = 1000) andThen SimResourceCommand.Exit
+ val consumer = SimWorkConsumer(2000.0, 1.0)
try {
provider.consume(consumer)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
index 3780fd60..fc43c3da 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.resources
-import io.mockk.every
-import io.mockk.mockk
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.*
@@ -47,8 +45,9 @@ internal class SimResourceTransformerTest {
launch { source.consume(forwarder) }
forwarder.consume(object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
- return SimResourceCommand.Exit
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ ctx.close()
+ return Long.MAX_VALUE
}
})
@@ -67,12 +66,14 @@ internal class SimResourceTransformerTest {
forwarder.consume(object : SimResourceConsumer {
var isFirst = true
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
return if (isFirst) {
isFirst = false
- SimResourceCommand.Consume(1.0, duration = 10 * 1000L)
+ ctx.push(1.0)
+ 10 * 1000
} else {
- SimResourceCommand.Exit
+ ctx.close()
+ Long.MAX_VALUE
}
}
})
@@ -85,7 +86,10 @@ internal class SimResourceTransformerTest {
fun testState() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand = SimResourceCommand.Exit
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ ctx.close()
+ return Long.MAX_VALUE
+ }
}
assertFalse(forwarder.isActive)
@@ -106,8 +110,12 @@ internal class SimResourceTransformerTest {
fun testCancelPendingDelegate() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit
+ val consumer = spyk(object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ ctx.close()
+ return Long.MAX_VALUE
+ }
+ })
forwarder.startConsumer(consumer)
forwarder.cancel()
@@ -121,8 +129,7 @@ internal class SimResourceTransformerTest {
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10)
+ val consumer = spyk(SimWorkConsumer(2000.0, 1.0))
source.startConsumer(forwarder)
yield()
@@ -140,8 +147,7 @@ internal class SimResourceTransformerTest {
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10)
+ val consumer = spyk(SimWorkConsumer(2000.0, 1.0))
source.startConsumer(forwarder)
yield()
@@ -159,8 +165,12 @@ internal class SimResourceTransformerTest {
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit
+ val consumer = object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ ctx.close()
+ return Long.MAX_VALUE
+ }
+ }
source.startConsumer(forwarder)
forwarder.consume(consumer)
@@ -190,7 +200,7 @@ internal class SimResourceTransformerTest {
@Test
fun testTransformExit() = runBlockingSimulation {
- val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit }
+ val forwarder = SimResourceTransformer { ctx, _ -> ctx.close(); Long.MAX_VALUE }
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(1.0, scheduler)