summaryrefslogtreecommitdiff
path: root/opendc-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt5
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt2
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt10
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt24
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt2
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt37
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt104
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt13
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt6
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt6
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt33
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt169
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt66
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt100
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt145
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt26
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt16
20 files changed, 335 insertions, 445 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
index d24ed1f3..c560cd28 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
@@ -129,6 +129,10 @@ public abstract class SimAbstractHypervisor(
override fun close() {
super.close()
+ for (cpu in cpus) {
+ cpu.close()
+ }
+
_vms.remove(this)
}
}
@@ -137,9 +141,9 @@ public abstract class SimAbstractHypervisor(
* A [SimProcessingUnit] of a virtual machine.
*/
private class VCpu(
- private val source: SimResourceProvider,
+ private val source: SimResourceCloseableProvider,
override val model: ProcessingUnit
- ) : SimProcessingUnit, SimResourceProvider by source {
+ ) : SimProcessingUnit, SimResourceCloseableProvider by source {
override var capacity: Double
get() = source.capacity
set(_) {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 93d306cf..3a70680c 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -115,11 +115,6 @@ public abstract class SimAbstractMachine(
isTerminated = true
cancel()
- interpreter.batch {
- for (cpu in cpus) {
- cpu.close()
- }
- }
}
/* SimResourceSystem */
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
index 11034a57..3ce85d02 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
@@ -83,7 +83,7 @@ public class SimPdu(
/**
* A PDU outlet.
*/
- public class Outlet(private val provider: SimResourceProvider) : SimPowerOutlet(), AutoCloseable {
+ public class Outlet(private val provider: SimResourceCloseableProvider) : SimPowerOutlet(), AutoCloseable {
override fun onConnect(inlet: SimPowerInlet) {
provider.startConsumer(inlet.createConsumer())
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index 5fe7d7bb..84217278 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -56,8 +56,6 @@ public abstract class SimAbstractResourceAggregator(
/* SimResourceAggregator */
override fun addInput(input: SimResourceProvider) {
- check(state != SimResourceState.Stopped) { "Aggregator has been stopped" }
-
val consumer = Consumer()
_inputs.add(input)
_inputConsumers.add(consumer)
@@ -70,8 +68,8 @@ public abstract class SimAbstractResourceAggregator(
private val _inputConsumers = mutableListOf<Consumer>()
/* SimResourceProvider */
- override val state: SimResourceState
- get() = _output.state
+ override val isActive: Boolean
+ get() = _output.isActive
override val capacity: Double
get() = _output.capacity
@@ -97,10 +95,6 @@ public abstract class SimAbstractResourceAggregator(
_output.interrupt()
}
- override fun close() {
- _output.close()
- }
-
private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) {
override fun createLogic(): SimResourceProviderLogic {
return object : SimResourceProviderLogic {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
index de26f99e..c1b1450e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
@@ -33,6 +33,12 @@ public abstract class SimAbstractResourceProvider(
initialCapacity: Double
) : SimResourceProvider {
/**
+ * A flag to indicate that the resource provider is active.
+ */
+ public override val isActive: Boolean
+ get() = ctx != null
+
+ /**
* The capacity of the resource.
*/
public override var capacity: Double = initialCapacity
@@ -67,12 +73,6 @@ public abstract class SimAbstractResourceProvider(
private set
/**
- * The state of the resource provider.
- */
- final override var state: SimResourceState = SimResourceState.Pending
- private set
-
- /**
* Construct the [SimResourceProviderLogic] instance for a new consumer.
*/
protected abstract fun createLogic(): SimResourceProviderLogic
@@ -96,21 +96,15 @@ public abstract class SimAbstractResourceProvider(
}
final override fun startConsumer(consumer: SimResourceConsumer) {
- check(state == SimResourceState.Pending) { "Resource is in invalid state" }
+ check(ctx == null) { "Resource is in invalid state" }
val ctx = interpreter.newContext(consumer, createLogic(), parent)
ctx.capacity = capacity
this.ctx = ctx
- this.state = SimResourceState.Active
start(ctx)
}
- override fun close() {
- cancel()
- state = SimResourceState.Stopped
- }
-
final override fun interrupt() {
ctx?.interrupt()
}
@@ -121,10 +115,6 @@ public abstract class SimAbstractResourceProvider(
this.ctx = null
ctx.close()
}
-
- if (state != SimResourceState.Stopped) {
- state = SimResourceState.Pending
- }
}
override fun toString(): String = "SimAbstractResourceProvider[capacity=$capacity]"
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
index c39c1aca..991cda7a 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -45,7 +45,7 @@ public class SimResourceAggregatorMaxMin(
val command = if (grantedWork > 0.0 && grantedSpeed > 0.0)
SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
else
- SimResourceCommand.Idle(deadline)
+ SimResourceCommand.Idle()
input.push(command)
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt
new file mode 100644
index 00000000..bce8274b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceProvider] that has a controllable and limited lifetime.
+ *
+ * This interface is used to signal that the resource provider may be closed and not reused after that point.
+ */
+public interface SimResourceCloseableProvider : SimResourceProvider, AutoCloseable {
+ /**
+ * End the lifetime of the resource provider.
+ *
+ * This operation cancels the existing resource consumer and prevents the resource provider from being reused.
+ */
+ public override fun close()
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
index e0333ff9..6bfbfc99 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
@@ -29,10 +29,10 @@ public interface SimResourceDistributor : SimResourceConsumer {
/**
* The output resource providers to which resource consumers can be attached.
*/
- public val outputs: Set<SimResourceProvider>
+ public val outputs: Set<SimResourceCloseableProvider>
/**
* Create a new output for the distributor.
*/
- public fun newOutput(): SimResourceProvider
+ public fun newOutput(): SimResourceCloseableProvider
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index be9e89fb..d8fc8cb6 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -32,7 +32,7 @@ public class SimResourceDistributorMaxMin(
private val interpreter: SimResourceInterpreter,
private val parent: SimResourceSystem? = null
) : SimResourceDistributor {
- override val outputs: Set<SimResourceProvider>
+ override val outputs: Set<SimResourceCloseableProvider>
get() = _outputs
private val _outputs = mutableSetOf<Output>()
@@ -57,7 +57,7 @@ public class SimResourceDistributorMaxMin(
private var totalAllocatedSpeed = 0.0
/* SimResourceDistributor */
- override fun newOutput(): SimResourceProvider {
+ override fun newOutput(): SimResourceCloseableProvider {
val provider = Output(ctx?.capacity ?: 0.0)
_outputs.add(provider)
return provider
@@ -65,7 +65,7 @@ public class SimResourceDistributorMaxMin(
/* SimResourceConsumer */
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- return doNext(ctx.capacity)
+ return doNext(ctx)
}
override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
@@ -94,12 +94,13 @@ public class SimResourceDistributorMaxMin(
/**
* Schedule the work of the outputs.
*/
- private fun doNext(capacity: Double): SimResourceCommand {
+ private fun doNext(ctx: SimResourceContext): SimResourceCommand {
// If there is no work yet, mark the input as idle.
if (activeOutputs.isEmpty()) {
return SimResourceCommand.Idle()
}
+ val capacity = ctx.capacity
var duration: Double = Double.MAX_VALUE
var deadline: Long = Long.MAX_VALUE
var availableSpeed = capacity
@@ -112,7 +113,7 @@ public class SimResourceDistributorMaxMin(
output.pull()
// Remove outputs that have finished
- if (output.isFinished) {
+ if (!output.isActive) {
outputIterator.remove()
}
}
@@ -125,33 +126,23 @@ public class SimResourceDistributorMaxMin(
var remaining = activeOutputs.size
for (output in activeOutputs) {
val availableShare = availableSpeed / remaining--
+ val grantedSpeed = min(output.allowedSpeed, availableShare)
+ deadline = min(deadline, output.deadline)
- when (val command = output.activeCommand) {
- is SimResourceCommand.Idle -> {
- deadline = min(deadline, command.deadline)
- output.actualSpeed = 0.0
- }
- is SimResourceCommand.Consume -> {
- val grantedSpeed = min(output.allowedSpeed, availableShare)
- deadline = min(deadline, command.deadline)
-
- // Ignore idle computation
- if (grantedSpeed <= 0.0 || command.work <= 0.0) {
- output.actualSpeed = 0.0
- continue
- }
+ // Ignore idle computation
+ if (grantedSpeed <= 0.0 || output.work <= 0.0) {
+ output.actualSpeed = 0.0
+ continue
+ }
- totalRequestedSpeed += command.limit
- totalRequestedWork += command.work
+ totalRequestedSpeed += output.limit
+ totalRequestedWork += output.work
- output.actualSpeed = grantedSpeed
- availableSpeed -= grantedSpeed
+ output.actualSpeed = grantedSpeed
+ availableSpeed -= grantedSpeed
- // The duration that we want to run is that of the shortest request of an output
- duration = min(duration, command.work / grantedSpeed)
- }
- SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" }
- }
+ // The duration that we want to run is that of the shortest request of an output
+ duration = min(duration, output.work / grantedSpeed)
}
assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" }
@@ -178,11 +169,30 @@ public class SimResourceDistributorMaxMin(
/**
* An internal [SimResourceProvider] implementation for switch outputs.
*/
- private inner class Output(capacity: Double) : SimAbstractResourceProvider(interpreter, parent, capacity), SimResourceProviderLogic, Comparable<Output> {
+ private inner class Output(capacity: Double) :
+ SimAbstractResourceProvider(interpreter, parent, capacity),
+ SimResourceCloseableProvider,
+ SimResourceProviderLogic,
+ Comparable<Output> {
+ /**
+ * A flag to indicate that the output is closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
+ * The current requested work.
+ */
+ var work: Double = 0.0
+
/**
- * The current command that is processed by the resource.
+ * The requested limit.
*/
- var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
+ var limit: Double = 0.0
+
+ /**
+ * The current deadline.
+ */
+ var deadline: Long = Long.MAX_VALUE
/**
* The processing speed that is allowed by the model constraints.
@@ -195,12 +205,6 @@ public class SimResourceDistributorMaxMin(
var actualSpeed: Double = 0.0
/**
- * A flag to indicate that the output is finished.
- */
- val isFinished
- get() = activeCommand is SimResourceCommand.Exit
-
- /**
* The timestamp at which we received the last command.
*/
private var lastCommandTimestamp: Long = Long.MIN_VALUE
@@ -209,6 +213,8 @@ public class SimResourceDistributorMaxMin(
override fun createLogic(): SimResourceProviderLogic = this
override fun start(ctx: SimResourceControllableContext) {
+ check(!isClosed) { "Cannot re-use closed output" }
+
activeOutputs += this
interpreter.batch {
@@ -219,27 +225,27 @@ public class SimResourceDistributorMaxMin(
}
override fun close() {
- val state = state
-
- super.close()
-
- if (state != SimResourceState.Stopped) {
- _outputs.remove(this)
- }
+ isClosed = true
+ cancel()
+ _outputs.remove(this)
}
/* SimResourceProviderLogic */
override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
allowedSpeed = 0.0
- activeCommand = SimResourceCommand.Idle(deadline)
+ this.deadline = deadline
+ work = 0.0
+ limit = 0.0
lastCommandTimestamp = ctx.clock.millis()
return Long.MAX_VALUE
}
override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
- allowedSpeed = ctx.speed
- activeCommand = SimResourceCommand.Consume(work, limit, deadline)
+ allowedSpeed = min(ctx.capacity, limit)
+ this.work = work
+ this.limit = limit
+ this.deadline = deadline
lastCommandTimestamp = ctx.clock.millis()
return Long.MAX_VALUE
@@ -250,7 +256,9 @@ public class SimResourceDistributorMaxMin(
}
override fun onFinish(ctx: SimResourceControllableContext) {
- activeCommand = SimResourceCommand.Exit
+ work = 0.0
+ limit = 0.0
+ deadline = Long.MAX_VALUE
lastCommandTimestamp = ctx.clock.millis()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
index f709ca17..b68b7261 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -29,11 +29,11 @@ import kotlin.coroutines.resumeWithException
/**
* A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer].
*/
-public interface SimResourceProvider : AutoCloseable {
+public interface SimResourceProvider {
/**
- * The state of the resource.
+ * A flag to indicate that the resource provider is currently being consumed by a [SimResourceConsumer].
*/
- public val state: SimResourceState
+ public val isActive: Boolean
/**
* The resource capacity available at this instant.
@@ -71,13 +71,6 @@ public interface SimResourceProvider : AutoCloseable {
* Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op.
*/
public fun cancel()
-
- /**
- * End the lifetime of the resource.
- *
- * This operation terminates the existing resource consumer.
- */
- public override fun close()
}
/**
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
index e224285e..f6e7b22f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
@@ -29,7 +29,7 @@ public interface SimResourceSwitch : AutoCloseable {
/**
* The output resource providers to which resource consumers can be attached.
*/
- public val outputs: Set<SimResourceProvider>
+ public val outputs: Set<SimResourceCloseableProvider>
/**
* The input resources that will be switched between the output providers.
@@ -44,7 +44,7 @@ public interface SimResourceSwitch : AutoCloseable {
/**
* Create a new output on the switch.
*/
- public fun newOutput(): SimResourceProvider
+ public fun newOutput(): SimResourceCloseableProvider
/**
* Add the specified [input] to the switch.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
index 2950af80..4ff741ed 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -35,7 +35,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
private var isClosed: Boolean = false
private val _outputs = mutableSetOf<Provider>()
- override val outputs: Set<SimResourceProvider>
+ override val outputs: Set<SimResourceCloseableProvider>
get() = _outputs
private val availableResources = ArrayDeque<SimResourceTransformer>()
@@ -61,7 +61,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
}
- override fun newOutput(): SimResourceProvider {
+ override fun newOutput(): SimResourceCloseableProvider {
check(!isClosed) { "Switch has been closed" }
check(availableResources.isNotEmpty()) { "No capacity to serve request" }
val forwarder = availableResources.poll()
@@ -101,7 +101,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
_inputs.forEach(SimResourceProvider::cancel)
}
- private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceProvider by forwarder {
+ private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceCloseableProvider, SimResourceProvider by forwarder {
override fun close() {
// We explicitly do not close the forwarder here in order to re-use it across output resources.
_outputs -= this
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index 684a1b52..50d58798 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -33,7 +33,7 @@ public class SimResourceSwitchMaxMin(
/**
* The output resource providers to which resource consumers can be attached.
*/
- override val outputs: Set<SimResourceProvider>
+ override val outputs: Set<SimResourceCloseableProvider>
get() = distributor.outputs
/**
@@ -70,7 +70,7 @@ public class SimResourceSwitchMaxMin(
/**
* Add an output to the switch.
*/
- override fun newOutput(): SimResourceProvider {
+ override fun newOutput(): SimResourceCloseableProvider {
check(!isClosed) { "Switch has been closed" }
return distributor.newOutput()
@@ -88,7 +88,7 @@ public class SimResourceSwitchMaxMin(
override fun close() {
if (!isClosed) {
isClosed = true
- aggregator.close()
+ aggregator.cancel()
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index fd3d1230..cec27e1c 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -33,7 +33,7 @@ import org.opendc.simulator.resources.impl.SimResourceCountersImpl
public class SimResourceTransformer(
private val isCoupled: Boolean = false,
private val transform: (SimResourceContext, SimResourceCommand) -> SimResourceCommand
-) : SimResourceFlow {
+) : SimResourceFlow, AutoCloseable {
/**
* The [SimResourceContext] in which the forwarder runs.
*/
@@ -49,11 +49,8 @@ public class SimResourceTransformer(
*/
private var hasDelegateStarted: Boolean = false
- /**
- * The state of the forwarder.
- */
- override var state: SimResourceState = SimResourceState.Pending
- private set
+ override val isActive: Boolean
+ get() = delegate != null
override val capacity: Double
get() = ctx?.capacity ?: 0.0
@@ -69,9 +66,8 @@ public class SimResourceTransformer(
private val _counters = SimResourceCountersImpl()
override fun startConsumer(consumer: SimResourceConsumer) {
- check(state == SimResourceState.Pending) { "Resource is in invalid state" }
+ check(delegate == null) { "Resource transformer already active" }
- state = SimResourceState.Active
delegate = consumer
// Interrupt the provider to replace the consumer
@@ -86,19 +82,18 @@ public class SimResourceTransformer(
val delegate = delegate
val ctx = ctx
- state = SimResourceState.Pending
-
- if (delegate != null && ctx != null) {
+ if (delegate != null) {
this.delegate = null
- delegate.onEvent(ctx, SimResourceEvent.Exit)
+
+ if (ctx != null) {
+ delegate.onEvent(ctx, SimResourceEvent.Exit)
+ }
}
}
override fun close() {
val ctx = ctx
- state = SimResourceState.Stopped
-
if (ctx != null) {
this.ctx = null
ctx.interrupt()
@@ -114,9 +109,7 @@ public class SimResourceTransformer(
updateCounters(ctx)
- return if (state == SimResourceState.Stopped) {
- SimResourceCommand.Exit
- } else if (delegate != null) {
+ return if (delegate != null) {
val command = transform(ctx, delegate.onNext(ctx))
_work = if (command is SimResourceCommand.Consume) command.work else 0.0
@@ -128,7 +121,7 @@ public class SimResourceTransformer(
delegate.onEvent(ctx, SimResourceEvent.Exit)
- if (isCoupled || state == SimResourceState.Stopped)
+ if (isCoupled)
SimResourceCommand.Exit
else
onNext(ctx)
@@ -184,10 +177,6 @@ public class SimResourceTransformer(
private fun reset() {
delegate = null
hasDelegateStarted = false
-
- if (state != SimResourceState.Stopped) {
- state = SimResourceState.Pending
- }
}
/**
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
index 46c5c63f..90c7bc75 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -39,7 +39,8 @@ internal class SimResourceContextImpl(
* The clock of the context.
*/
override val clock: Clock
- get() = interpreter.clock
+ get() = _clock
+ private val _clock = interpreter.clock
/**
* The capacity of the resource.
@@ -59,18 +60,7 @@ internal class SimResourceContextImpl(
* The amount of work still remaining at this instant.
*/
override val remainingWork: Double
- get() {
- val now = clock.millis()
-
- return if (_remainingWorkFlush < now) {
- _remainingWorkFlush = now
- computeRemainingWork(now).also { _remainingWork = it }
- } else {
- _remainingWork
- }
- }
- private var _remainingWork: Double = 0.0
- private var _remainingWorkFlush: Long = Long.MIN_VALUE
+ get() = getRemainingWork(_clock.millis())
/**
* A flag to indicate the state of the context.
@@ -92,20 +82,6 @@ internal class SimResourceContextImpl(
override val demand: Double
get() = _limit
- private val counters = object : SimResourceCounters {
- override var demand: Double = 0.0
- override var actual: Double = 0.0
- override var overcommit: Double = 0.0
-
- override fun reset() {
- demand = 0.0
- actual = 0.0
- overcommit = 0.0
- }
-
- override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
- }
-
/**
* The current state of the resource context.
*/
@@ -117,7 +93,7 @@ internal class SimResourceContextImpl(
/**
* The update flag indicating why the update was triggered.
*/
- private var _flag: Flag = Flag.None
+ private var _flag: Int = 0
/**
* The current pending update.
@@ -147,7 +123,7 @@ internal class SimResourceContextImpl(
return
}
- enableFlag(Flag.Interrupt)
+ _flag = _flag or FLAG_INTERRUPT
scheduleUpdate()
}
@@ -156,7 +132,7 @@ internal class SimResourceContextImpl(
return
}
- enableFlag(Flag.Invalidate)
+ _flag = _flag or FLAG_INVALIDATE
scheduleUpdate()
}
@@ -173,7 +149,7 @@ internal class SimResourceContextImpl(
*/
fun requiresUpdate(timestamp: Long): Boolean {
// Either the resource context is flagged or there is a pending update at this timestamp
- return _flag != Flag.None || _pendingUpdate?.timestamp == timestamp
+ return _flag != 0 || _pendingUpdate?.timestamp == timestamp
}
/**
@@ -185,7 +161,7 @@ internal class SimResourceContextImpl(
val newState = doUpdate(timestamp, oldState)
_state = newState
- _flag = Flag.None
+ _flag = 0
when (newState) {
SimResourceState.Pending ->
@@ -222,8 +198,8 @@ internal class SimResourceContextImpl(
// Resource context is not active, so its state will not update
SimResourceState.Pending, SimResourceState.Stopped -> state
SimResourceState.Active -> {
- val isInterrupted = _flag == Flag.Interrupt
- val remainingWork = remainingWork
+ val isInterrupted = _flag and FLAG_INTERRUPT != 0
+ val remainingWork = getRemainingWork(timestamp)
val isConsume = _limit > 0.0
// Update the resource counters only if there is some progress
@@ -236,11 +212,15 @@ internal class SimResourceContextImpl(
// 2. The resource capacity cannot satisfy the demand.
// 3. The resource consumer should be interrupted (e.g., someone called .interrupt())
if ((isConsume && remainingWork == 0.0) || _deadline <= timestamp || isInterrupted) {
- next(timestamp)
+ when (val command = consumer.onNext(this)) {
+ is SimResourceCommand.Idle -> interpretIdle(timestamp, command.deadline)
+ is SimResourceCommand.Consume -> interpretConsume(timestamp, command.work, command.limit, command.deadline)
+ is SimResourceCommand.Exit -> interpretExit()
+ }
} else if (isConsume) {
- interpret(SimResourceCommand.Consume(remainingWork, _limit, _deadline), timestamp)
+ interpretConsume(timestamp, remainingWork, _limit, _deadline)
} else {
- interpret(SimResourceCommand.Idle(_deadline), timestamp)
+ interpretIdle(timestamp, _deadline)
}
}
}
@@ -273,57 +253,65 @@ internal class SimResourceContextImpl(
}
/**
- * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
+ * Interpret the [SimResourceCommand.Consume] command.
*/
- private fun interpret(command: SimResourceCommand, now: Long): SimResourceState {
- return when (command) {
- is SimResourceCommand.Idle -> {
- val deadline = command.deadline
+ private fun interpretConsume(now: Long, work: Double, limit: Double, deadline: Long): SimResourceState {
+ require(deadline >= now) { "Deadline already passed" }
- require(deadline >= now) { "Deadline already passed" }
+ _speed = min(capacity, limit)
+ _work = work
+ _limit = limit
+ _deadline = deadline
- _speed = 0.0
- _work = 0.0
- _limit = 0.0
- _deadline = deadline
+ val timestamp = logic.onConsume(this, work, limit, deadline)
+ scheduleUpdate(timestamp)
- val timestamp = logic.onIdle(this, deadline)
- scheduleUpdate(timestamp)
+ return SimResourceState.Active
+ }
- SimResourceState.Active
- }
- is SimResourceCommand.Consume -> {
- val work = command.work
- val limit = command.limit
- val deadline = command.deadline
+ /**
+ * Interpret the [SimResourceCommand.Idle] command.
+ */
+ private fun interpretIdle(now: Long, deadline: Long): SimResourceState {
+ require(deadline >= now) { "Deadline already passed" }
- require(deadline >= now) { "Deadline already passed" }
+ _speed = 0.0
+ _work = 0.0
+ _limit = 0.0
+ _deadline = deadline
- _speed = min(capacity, limit)
- _work = work
- _limit = limit
- _deadline = deadline
+ val timestamp = logic.onIdle(this, deadline)
+ scheduleUpdate(timestamp)
- val timestamp = logic.onConsume(this, work, limit, deadline)
- scheduleUpdate(timestamp)
+ return SimResourceState.Active
+ }
- SimResourceState.Active
- }
- is SimResourceCommand.Exit -> {
- _speed = 0.0
- _work = 0.0
- _limit = 0.0
- _deadline = Long.MAX_VALUE
+ /**
+ * Interpret the [SimResourceCommand.Exit] command.
+ */
+ private fun interpretExit(): SimResourceState {
+ _speed = 0.0
+ _work = 0.0
+ _limit = 0.0
+ _deadline = Long.MAX_VALUE
- SimResourceState.Stopped
- }
- }
+ return SimResourceState.Stopped
}
+ private var _remainingWork: Double = 0.0
+ private var _remainingWorkFlush: Long = Long.MIN_VALUE
+
/**
- * Request the workload for more work.
+ * Obtain the remaining work at the given timestamp.
*/
- private fun next(now: Long): SimResourceState = interpret(consumer.onNext(this), now)
+ private fun getRemainingWork(now: Long): Double {
+ return if (_remainingWorkFlush < now) {
+ _remainingWorkFlush = now
+ computeRemainingWork(now).also { _remainingWork = it }
+ } else {
+ _remainingWork
+ }
+ }
/**
* Compute the remaining work based on the current state.
@@ -357,25 +345,6 @@ internal class SimResourceContextImpl(
}
/**
- * Enable the specified [flag] taking into account precedence.
- */
- private fun enableFlag(flag: Flag) {
- _flag = when (_flag) {
- Flag.None -> flag
- Flag.Invalidate ->
- when (flag) {
- Flag.None -> flag
- else -> flag
- }
- Flag.Interrupt ->
- when (flag) {
- Flag.None, Flag.Invalidate -> flag
- else -> flag
- }
- }
- }
-
- /**
* Schedule an update for this resource context.
*/
private fun scheduleUpdate() {
@@ -411,12 +380,12 @@ internal class SimResourceContextImpl(
}
/**
- * An enumeration of flags that can be assigned to a resource context to indicate whether they are invalidated or
- * interrupted.
+ * A flag to indicate that the context should be invalidated.
*/
- enum class Flag {
- None,
- Interrupt,
- Invalidate
- }
+ private val FLAG_INVALIDATE = 0b01
+
+ /**
+ * A flag to indicate that the context should be interrupted.
+ */
+ private val FLAG_INTERRUPT = 0b10
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
index cb0d6160..6dd02ae5 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
@@ -48,12 +48,12 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
/**
* The queue of resource updates that are scheduled for immediate execution.
*/
- private val queue = ArrayDeque<Update>()
+ private val queue = ArrayDeque<SimResourceContextImpl>()
/**
* A priority queue containing the resource updates to be scheduled in the future.
*/
- private val futureQueue = PriorityQueue<Update>()
+ private val futureQueue = PriorityQueue<Update>(compareBy { it.timestamp })
/**
* The stack of interpreter invocations to occur in the future.
@@ -83,7 +83,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
* re-computed. In case no interpreter is currently active, the interpreter will be started.
*/
fun scheduleImmediate(ctx: SimResourceContextImpl) {
- queue.add(Update(ctx, Long.MIN_VALUE))
+ queue.add(ctx)
// In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked
// up by the active interpreter.
@@ -137,7 +137,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
require(timestamp >= now) { "Timestamp must be in the future" }
- val update = Update(ctx, timestamp)
+ val update = allocUpdate(ctx, timestamp)
futureQueue.add(update)
// Optimization: Check if we need to push the interruption forward. Note that we check by timer reference.
@@ -193,9 +193,16 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
futureQueue.poll()
- if (update(now) && visited.add(update.ctx)) {
- collectAncestors(update.ctx, visited)
+ val shouldExecute = !update.isCancelled && update.ctx.requiresUpdate(now)
+ if (shouldExecute) {
+ update.ctx.doUpdate(now)
+
+ if (visited.add(update.ctx)) {
+ collectAncestors(update.ctx, visited)
+ }
}
+
+ updatePool.add(update)
}
// Repeat execution of all immediate updates until the system has converged to a steady-state
@@ -203,9 +210,15 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
do {
// Execute all immediate updates
while (true) {
- val update = queue.poll() ?: break
- if (update(now) && visited.add(update.ctx)) {
- collectAncestors(update.ctx, visited)
+ val ctx = queue.poll() ?: break
+ val shouldExecute = ctx.requiresUpdate(now)
+
+ if (shouldExecute) {
+ ctx.doUpdate(now)
+
+ if (visited.add(ctx)) {
+ collectAncestors(ctx, visited)
+ }
}
}
@@ -278,6 +291,26 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
}
/**
+ * The pool of existing updates.
+ */
+ private val updatePool = ArrayDeque<Update>()
+
+ /**
+ * Allocate an [Update] object.
+ */
+ private fun allocUpdate(ctx: SimResourceContextImpl, timestamp: Long): Update {
+ val update = updatePool.poll()
+ return if (update != null) {
+ update.ctx = ctx
+ update.timestamp = timestamp
+ update.isCancelled = false
+ update
+ } else {
+ Update(ctx, timestamp)
+ }
+ }
+
+ /**
* A future interpreter invocation.
*
* This class is used to keep track of the future scheduler invocations created using the [Delay] instance. In case
@@ -299,7 +332,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
* This class represents an update in the future at [timestamp] requested by [ctx]. A deferred update might be
* cancelled if the resource context was invalidated in the meantime.
*/
- class Update(@JvmField val ctx: SimResourceContextImpl, @JvmField val timestamp: Long) : Comparable<Update> {
+ class Update(@JvmField var ctx: SimResourceContextImpl, @JvmField var timestamp: Long) {
/**
* A flag to indicate that the task has been cancelled.
*/
@@ -313,19 +346,6 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
isCancelled = true
}
- /**
- * Immediately run update.
- */
- operator fun invoke(timestamp: Long): Boolean {
- val shouldExecute = !isCancelled && ctx.requiresUpdate(timestamp)
- if (shouldExecute) {
- ctx.doUpdate(timestamp)
- }
- return shouldExecute
- }
-
- override fun compareTo(other: Update): Int = timestamp.compareTo(other.timestamp)
-
override fun toString(): String = "Update[ctx=$ctx,timestamp=$timestamp]"
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
index 51024e80..2f01a8c4 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -26,7 +26,7 @@ import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import kotlinx.coroutines.*
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertThrows
@@ -58,17 +58,13 @@ internal class SimResourceAggregatorMaxMinTest {
val adapter = SimSpeedConsumerAdapter(forwarder, usage::add)
source.startConsumer(adapter)
- try {
- aggregator.consume(consumer)
- yield()
+ aggregator.consume(consumer)
+ yield()
- assertAll(
- { assertEquals(1000, clock.millis()) },
- { assertEquals(listOf(0.0, 0.5, 0.0), usage) }
- )
- } finally {
- aggregator.close()
- }
+ assertAll(
+ { assertEquals(1000, clock.millis()) },
+ { assertEquals(listOf(0.0, 0.5, 0.0), usage) }
+ )
}
@Test
@@ -86,16 +82,12 @@ internal class SimResourceAggregatorMaxMinTest {
val usage = mutableListOf<Double>()
val adapter = SimSpeedConsumerAdapter(consumer, usage::add)
- try {
- aggregator.consume(adapter)
- yield()
- assertAll(
- { assertEquals(1000, clock.millis()) },
- { assertEquals(listOf(0.0, 2.0, 0.0), usage) }
- )
- } finally {
- aggregator.close()
- }
+ aggregator.consume(adapter)
+ yield()
+ assertAll(
+ { assertEquals(1000, clock.millis()) },
+ { assertEquals(listOf(0.0, 2.0, 0.0), usage) }
+ )
}
@Test
@@ -114,15 +106,11 @@ internal class SimResourceAggregatorMaxMinTest {
.returns(SimResourceCommand.Consume(4.0, 4.0, 1000))
.andThen(SimResourceCommand.Exit)
- try {
- aggregator.consume(consumer)
- yield()
- assertEquals(1000, clock.millis())
+ aggregator.consume(consumer)
+ yield()
+ assertEquals(1000, clock.millis())
- verify(exactly = 2) { consumer.onNext(any()) }
- } finally {
- aggregator.close()
- }
+ verify(exactly = 2) { consumer.onNext(any()) }
}
@Test
@@ -141,13 +129,9 @@ internal class SimResourceAggregatorMaxMinTest {
.returns(SimResourceCommand.Consume(1.0, 1.0))
.andThenThrows(IllegalStateException("Test Exception"))
- try {
- assertThrows<IllegalStateException> { aggregator.consume(consumer) }
- yield()
- assertEquals(SimResourceState.Pending, sources[0].state)
- } finally {
- aggregator.close()
- }
+ assertThrows<IllegalStateException> { aggregator.consume(consumer) }
+ yield()
+ assertFalse(sources[0].isActive)
}
@Test
@@ -162,17 +146,13 @@ internal class SimResourceAggregatorMaxMinTest {
sources.forEach(aggregator::addInput)
val consumer = SimWorkConsumer(4.0, 1.0)
- try {
- coroutineScope {
- launch { aggregator.consume(consumer) }
- delay(1000)
- sources[0].capacity = 0.5
- }
- yield()
- assertEquals(2334, clock.millis())
- } finally {
- aggregator.close()
+ coroutineScope {
+ launch { aggregator.consume(consumer) }
+ delay(1000)
+ sources[0].capacity = 0.5
}
+ yield()
+ assertEquals(2334, clock.millis())
}
@Test
@@ -187,17 +167,13 @@ internal class SimResourceAggregatorMaxMinTest {
sources.forEach(aggregator::addInput)
val consumer = SimWorkConsumer(1.0, 0.5)
- try {
- coroutineScope {
- launch { aggregator.consume(consumer) }
- delay(500)
- sources[0].capacity = 0.5
- }
- yield()
- assertEquals(1000, clock.millis())
- } finally {
- aggregator.close()
+ coroutineScope {
+ launch { aggregator.consume(consumer) }
+ delay(500)
+ sources[0].capacity = 0.5
}
+ yield()
+ assertEquals(1000, clock.millis())
}
@Test
@@ -216,13 +192,9 @@ internal class SimResourceAggregatorMaxMinTest {
.returns(SimResourceCommand.Consume(4.0, 4.0, 1000))
.andThen(SimResourceCommand.Exit)
- try {
- aggregator.consume(consumer)
- yield()
- assertEquals(1000, clock.millis())
- assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" }
- } finally {
- aggregator.close()
- }
+ aggregator.consume(consumer)
+ yield()
+ assertEquals(1000, clock.millis())
+ assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" }
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 08d88093..4895544d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -50,16 +50,12 @@ class SimResourceSourceTest {
.returns(SimResourceCommand.Consume(1000 * capacity, capacity))
.andThen(SimResourceCommand.Exit)
- try {
- val res = mutableListOf<Double>()
- val adapter = SimSpeedConsumerAdapter(consumer, res::add)
+ val res = mutableListOf<Double>()
+ val adapter = SimSpeedConsumerAdapter(consumer, res::add)
- provider.consume(adapter)
+ provider.consume(adapter)
- assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
- } finally {
- provider.close()
- }
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
}
@Test
@@ -69,17 +65,13 @@ class SimResourceSourceTest {
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
- try {
- coroutineScope {
- launch { provider.consume(consumer) }
- delay(1000)
- provider.capacity = 0.5
- }
- assertEquals(3000, clock.millis())
- verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
- } finally {
- provider.close()
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ delay(1000)
+ provider.capacity = 0.5
}
+ assertEquals(3000, clock.millis())
+ verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
}
@Test
@@ -93,16 +85,12 @@ class SimResourceSourceTest {
.returns(SimResourceCommand.Consume(1000 * capacity, 2 * capacity))
.andThen(SimResourceCommand.Exit)
- try {
- val res = mutableListOf<Double>()
- val adapter = SimSpeedConsumerAdapter(consumer, res::add)
+ val res = mutableListOf<Double>()
+ val adapter = SimSpeedConsumerAdapter(consumer, res::add)
- provider.consume(adapter)
+ provider.consume(adapter)
- assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
- } finally {
- provider.close()
- }
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
}
/**
@@ -125,11 +113,7 @@ class SimResourceSourceTest {
}
}
- try {
- provider.consume(consumer)
- } finally {
- provider.close()
- }
+ provider.consume(consumer)
}
@Test
@@ -160,17 +144,13 @@ class SimResourceSourceTest {
}
}
- try {
- launch {
- yield()
- resCtx.interrupt()
- }
- provider.consume(consumer)
-
- assertEquals(0, clock.millis())
- } finally {
- provider.close()
+ launch {
+ yield()
+ resCtx.interrupt()
}
+ provider.consume(consumer)
+
+ assertEquals(0, clock.millis())
}
@Test
@@ -183,12 +163,8 @@ class SimResourceSourceTest {
every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) }
.throws(IllegalStateException())
- try {
- assertThrows<IllegalStateException> {
- provider.consume(consumer)
- }
- } finally {
- provider.close()
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
}
}
@@ -203,12 +179,8 @@ class SimResourceSourceTest {
.returns(SimResourceCommand.Consume(1.0, 1.0))
.andThenThrows(IllegalStateException())
- try {
- assertThrows<IllegalStateException> {
- provider.consume(consumer)
- }
- } finally {
- provider.close()
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
}
}
@@ -223,41 +195,16 @@ class SimResourceSourceTest {
.returns(SimResourceCommand.Consume(1.0, 1.0))
.andThenThrows(IllegalStateException())
- try {
- assertThrows<IllegalStateException> {
- coroutineScope {
- launch { provider.consume(consumer) }
- provider.consume(consumer)
- }
- }
- } finally {
- provider.close()
- }
- }
-
- @Test
- fun testClosedConsumption() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
-
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onNext(any()) }
- .returns(SimResourceCommand.Consume(1.0, 1.0))
- .andThenThrows(IllegalStateException())
-
- try {
- assertThrows<IllegalStateException> {
- provider.close()
+ assertThrows<IllegalStateException> {
+ coroutineScope {
+ launch { provider.consume(consumer) }
provider.consume(consumer)
}
- } finally {
- provider.close()
}
}
@Test
- fun testCloseDuringConsumption() = runBlockingSimulation {
+ fun testCancelDuringConsumption() = runBlockingSimulation {
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -267,15 +214,11 @@ class SimResourceSourceTest {
.returns(SimResourceCommand.Consume(1.0, 1.0))
.andThenThrows(IllegalStateException())
- try {
- launch { provider.consume(consumer) }
- delay(500)
- provider.close()
+ launch { provider.consume(consumer) }
+ delay(500)
+ provider.cancel()
- assertEquals(500, clock.millis())
- } finally {
- provider.close()
- }
+ assertEquals(500, clock.millis())
}
@Test
@@ -289,13 +232,9 @@ class SimResourceSourceTest {
.returns(SimResourceCommand.Idle(clock.millis() + 500))
.andThen(SimResourceCommand.Exit)
- try {
- provider.consume(consumer)
+ provider.consume(consumer)
- assertEquals(500, clock.millis())
- } finally {
- provider.close()
- }
+ assertEquals(500, clock.millis())
}
@Test
@@ -311,11 +250,7 @@ class SimResourceSourceTest {
.returns(SimResourceCommand.Idle())
.andThenThrows(IllegalStateException())
- try {
- provider.consume(consumer)
- } finally {
- provider.close()
- }
+ provider.consume(consumer)
}
}
}
@@ -331,12 +266,8 @@ class SimResourceSourceTest {
.returns(SimResourceCommand.Idle(2))
.andThen(SimResourceCommand.Exit)
- try {
- delay(10)
+ delay(10)
- assertThrows<IllegalArgumentException> { provider.consume(consumer) }
- } finally {
- provider.close()
- }
+ assertThrows<IllegalArgumentException> { provider.consume(consumer) }
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
index 810052b8..cf69b7b5 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -40,15 +40,12 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
@OptIn(ExperimentalCoroutinesApi::class)
internal class SimResourceTransformerTest {
@Test
- fun testExitImmediately() = runBlockingSimulation {
+ fun testCancelImmediately() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
- launch {
- source.consume(forwarder)
- source.close()
- }
+ launch { source.consume(forwarder) }
forwarder.consume(object : SimResourceConsumer {
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
@@ -57,18 +54,16 @@ internal class SimResourceTransformerTest {
})
forwarder.close()
+ source.cancel()
}
@Test
- fun testExit() = runBlockingSimulation {
+ fun testCancel() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
- launch {
- source.consume(forwarder)
- source.close()
- }
+ launch { source.consume(forwarder) }
forwarder.consume(object : SimResourceConsumer {
var isFirst = true
@@ -84,6 +79,7 @@ internal class SimResourceTransformerTest {
})
forwarder.close()
+ source.cancel()
}
@Test
@@ -93,18 +89,18 @@ internal class SimResourceTransformerTest {
override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit
}
- assertEquals(SimResourceState.Pending, forwarder.state)
+ assertFalse(forwarder.isActive)
forwarder.startConsumer(consumer)
- assertEquals(SimResourceState.Active, forwarder.state)
+ assertTrue(forwarder.isActive)
assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) }
forwarder.cancel()
- assertEquals(SimResourceState.Pending, forwarder.state)
+ assertFalse(forwarder.isActive)
forwarder.close()
- assertEquals(SimResourceState.Stopped, forwarder.state)
+ assertFalse(forwarder.isActive)
}
@Test
@@ -171,7 +167,7 @@ internal class SimResourceTransformerTest {
forwarder.consume(consumer)
yield()
- assertEquals(SimResourceState.Pending, source.state)
+ assertFalse(forwarder.isActive)
}
@Test
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
index db4fe856..42648cf1 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
@@ -41,12 +41,8 @@ internal class SimWorkConsumerTest {
val consumer = SimWorkConsumer(1.0, 1.0)
- try {
- provider.consume(consumer)
- assertEquals(1000, clock.millis())
- } finally {
- provider.close()
- }
+ provider.consume(consumer)
+ assertEquals(1000, clock.millis())
}
@Test
@@ -56,11 +52,7 @@ internal class SimWorkConsumerTest {
val consumer = SimWorkConsumer(1.0, 0.5)
- try {
- provider.consume(consumer)
- assertEquals(2000, clock.millis())
- } finally {
- provider.close()
- }
+ provider.consume(consumer)
+ assertEquals(2000, clock.millis())
}
}