summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-resources/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources/src/main')
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt6
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt26
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt21
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt37
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt69
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt95
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt15
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt7
9 files changed, 238 insertions, 42 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index 1bcaf45f..6ae04f27 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -22,12 +22,10 @@
package org.opendc.simulator.resources
-import java.time.Clock
-
/**
* Abstract implementation of [SimResourceAggregator].
*/
-public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator {
+public abstract class SimAbstractResourceAggregator(private val scheduler: SimResourceScheduler) : SimResourceAggregator {
/**
* This method is invoked when the resource consumer consumes resources.
*/
@@ -77,7 +75,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
protected val outputContext: SimResourceContext
get() = context
- private val context = object : SimAbstractResourceContext(0.0, clock, _output) {
+ private val context = object : SimAbstractResourceContext(0.0, scheduler, _output) {
override val remainingWork: Double
get() {
val now = clock.millis()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index d2f585b1..c03bfad5 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -31,9 +31,16 @@ import kotlin.math.min
*/
public abstract class SimAbstractResourceContext(
initialCapacity: Double,
- override val clock: Clock,
+ private val scheduler: SimResourceScheduler,
private val consumer: SimResourceConsumer
-) : SimResourceContext {
+) : SimResourceContext, SimResourceFlushable {
+
+ /**
+ * The clock of the context.
+ */
+ public override val clock: Clock
+ get() = scheduler.clock
+
/**
* The capacity of the resource.
*/
@@ -143,21 +150,12 @@ public abstract class SimAbstractResourceContext(
flush(isIntermediate = true)
doStop()
- } catch (cause: Throwable) {
- doFail(cause)
} finally {
isProcessing = false
}
}
- /**
- * Flush the current active resource consumption.
- *
- * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
- * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
- * will be asked to deliver a new command and is essentially interrupted.
- */
- public fun flush(isIntermediate: Boolean = false) {
+ override fun flush(isIntermediate: Boolean) {
// Flush is no-op when the consumer is finished or not yet started
if (state != SimResourceState.Active) {
return
@@ -226,7 +224,7 @@ public abstract class SimAbstractResourceContext(
return
}
- flush()
+ scheduler.schedule(this, isIntermediate = false)
}
override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]"
@@ -234,7 +232,7 @@ public abstract class SimAbstractResourceContext(
/**
* A flag to indicate that the resource is currently processing a command.
*/
- protected var isProcessing: Boolean = false
+ private var isProcessing: Boolean = false
/**
* The current command that is being processed.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
index 5d550ad8..5665abd1 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -22,12 +22,10 @@
package org.opendc.simulator.resources
-import java.time.Clock
-
/**
* A [SimResourceAggregator] that distributes the load equally across the input resources.
*/
-public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) {
+public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimAbstractResourceAggregator(scheduler) {
private val consumers = mutableListOf<Input>()
override fun doConsume(work: Double, limit: Double, deadline: Long) {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index 8128c98b..a76cb1e3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources
-import java.time.Clock
import kotlin.math.max
import kotlin.math.min
@@ -31,7 +30,7 @@ import kotlin.math.min
*/
public class SimResourceDistributorMaxMin(
override val input: SimResourceProvider,
- private val clock: Clock,
+ private val scheduler: SimResourceScheduler,
private val listener: Listener? = null
) : SimResourceDistributor {
override val outputs: Set<SimResourceProvider>
@@ -220,7 +219,7 @@ public class SimResourceDistributorMaxMin(
}
}
- assert(deadline >= clock.millis()) { "Deadline already passed" }
+ assert(deadline >= scheduler.clock.millis()) { "Deadline already passed" }
this.totalRequestedSpeed = totalRequestedSpeed
this.totalRequestedWork = totalRequestedWork
@@ -337,7 +336,7 @@ public class SimResourceDistributorMaxMin(
private inner class OutputContext(
private val provider: OutputProvider,
consumer: SimResourceConsumer
- ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> {
+ ) : SimAbstractResourceContext(provider.capacity, scheduler, consumer), Comparable<OutputContext> {
/**
* The current command that is processed by the vCPU.
*/
@@ -402,6 +401,8 @@ public class SimResourceDistributorMaxMin(
}
}
+ private var isProcessing: Boolean = false
+
override fun interrupt() {
// Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
// to infinite recursion.
@@ -409,10 +410,16 @@ public class SimResourceDistributorMaxMin(
return
}
- super.interrupt()
+ try {
+ isProcessing = false
- // Force the scheduler to re-schedule
- schedule()
+ super.interrupt()
+
+ // Force the scheduler to re-schedule
+ schedule()
+ } finally {
+ isProcessing = true
+ }
}
override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed)
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt
new file mode 100644
index 00000000..f6a1a42e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * An interface used by the [SimResourceScheduler] to flush the progress of resource consumer.
+ */
+public interface SimResourceFlushable {
+ /**
+ * Flush the current active resource consumption.
+ *
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public fun flush(isIntermediate: Boolean)
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt
new file mode 100644
index 00000000..a228c47b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * A resource scheduler is responsible for scheduling the communication and synchronization between multiple resource
+ * providers and consumers.
+ *
+ * By centralizing the scheduling logic, updates of resources within a single system can be scheduled and tracked more
+ * efficiently, reducing the overall work needed per update.
+ */
+public interface SimResourceScheduler {
+ /**
+ * The [Clock] associated with this scheduler.
+ */
+ public val clock: Clock
+
+ /**
+ * Schedule a direct interrupt for the resource context represented by [flushable].
+ *
+ * @param flushable The resource context that needs to be flushed.
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean = false)
+
+ /**
+ * Schedule an interrupt in the future for the resource context represented by [flushable].
+ *
+ * This method will override earlier calls to this method for the same [flushable].
+ *
+ * @param flushable The resource context that needs to be flushed.
+ * @param timestamp The timestamp when the interrupt should happen.
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean = false)
+
+ /**
+ * Batch the execution of several interrupts into a single call.
+ *
+ * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
+ */
+ public fun batch(block: () -> Unit)
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt
new file mode 100644
index 00000000..cdbb4a6c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.opendc.utils.TimerScheduler
+import java.time.Clock
+import java.util.ArrayDeque
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A [SimResourceScheduler] queues all interrupts that occur during execution to be executed after.
+ *
+ * @param clock The virtual simulation clock.
+ */
+public class SimResourceSchedulerTrampoline(context: CoroutineContext, override val clock: Clock) : SimResourceScheduler {
+ /**
+ * The [TimerScheduler] to actually schedule the interrupts.
+ */
+ private val timers = TimerScheduler<Any>(context, clock)
+
+ /**
+ * A flag to indicate that an interrupt is currently running already.
+ */
+ private var isRunning: Boolean = false
+
+ /**
+ * The queue of resources to be flushed.
+ */
+ private val queue = ArrayDeque<Pair<SimResourceFlushable, Boolean>>()
+
+ override fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean) {
+ queue.add(flushable to isIntermediate)
+
+ if (isRunning) {
+ return
+ }
+
+ flush()
+ }
+
+ override fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean) {
+ timers.startSingleTimerTo(flushable, timestamp) {
+ schedule(flushable, isIntermediate)
+ }
+ }
+
+ override fun batch(block: () -> Unit) {
+ val wasAlreadyRunning = isRunning
+ try {
+ isRunning = true
+ block()
+ } finally {
+ if (!wasAlreadyRunning) {
+ isRunning = false
+ }
+ }
+ }
+
+ /**
+ * Flush the scheduled queue.
+ */
+ private fun flush() {
+ val visited = mutableSetOf<SimResourceFlushable>()
+ try {
+ isRunning = true
+ while (queue.isNotEmpty()) {
+ val (flushable, isIntermediate) = queue.poll()
+ flushable.flush(isIntermediate)
+ visited.add(flushable)
+ }
+ } finally {
+ isRunning = false
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index fe569096..3277b889 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.resources
-import org.opendc.utils.TimerScheduler
-import java.time.Clock
import kotlin.math.ceil
import kotlin.math.min
@@ -31,13 +29,11 @@ import kotlin.math.min
* A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity.
*
* @param initialCapacity The initial capacity of the resource.
- * @param clock The virtual clock to track simulation time.
* @param scheduler The scheduler to schedule the interrupts.
*/
public class SimResourceSource(
initialCapacity: Double,
- private val clock: Clock,
- private val scheduler: TimerScheduler<Any>
+ private val scheduler: SimResourceScheduler
) : SimResourceProvider {
/**
* The current processing speed of the resource.
@@ -96,22 +92,21 @@ public class SimResourceSource(
/**
* Internal implementation of [SimResourceContext] for this class.
*/
- private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) {
+ private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, scheduler, consumer) {
override fun onIdle(deadline: Long) {
// Do not resume if deadline is "infinite"
if (deadline != Long.MAX_VALUE) {
- scheduler.startSingleTimerTo(this, deadline) { flush() }
+ scheduler.schedule(this, deadline)
}
}
override fun onConsume(work: Double, limit: Double, deadline: Long) {
val until = min(deadline, clock.millis() + getDuration(work, speed))
-
- scheduler.startSingleTimerTo(this, until, ::flush)
+ scheduler.schedule(this, until)
}
override fun onFinish() {
- scheduler.cancel(this)
+ cancel()
ctx = null
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index c796c251..5dc1e68d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -23,14 +23,13 @@
package org.opendc.simulator.resources
import kotlinx.coroutines.*
-import java.time.Clock
/**
* A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
* fair sharing.
*/
public class SimResourceSwitchMaxMin(
- clock: Clock,
+ scheduler: SimResourceScheduler,
private val listener: Listener? = null
) : SimResourceSwitch {
private val _outputs = mutableSetOf<SimResourceProvider>()
@@ -49,13 +48,13 @@ public class SimResourceSwitchMaxMin(
/**
* The aggregator to aggregate the resources.
*/
- private val aggregator = SimResourceAggregatorMaxMin(clock)
+ private val aggregator = SimResourceAggregatorMaxMin(scheduler)
/**
* The distributor to distribute the aggregated resources.
*/
private val distributor = SimResourceDistributorMaxMin(
- aggregator.output, clock,
+ aggregator.output, scheduler,
object : SimResourceDistributorMaxMin.Listener {
override fun onSliceFinish(
switch: SimResourceDistributor,