summaryrefslogtreecommitdiff
path: root/simulator/opendc-simulator/opendc-simulator-resources/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-resources/src/main')
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt85
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt4
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt10
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt12
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt1
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt37
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt4
8 files changed, 94 insertions, 61 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index 431ca625..dba334a2 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -36,6 +36,21 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
private val consumer: SimResourceConsumer<R>
) : SimResourceContext<R> {
/**
+ * The capacity of the resource.
+ */
+ override val capacity: Double
+ get() = resource.capacity
+
+ /**
+ * The amount of work still remaining at this instant.
+ */
+ override val remainingWork: Double
+ get() {
+ val activeCommand = activeCommand ?: return 0.0
+ return computeRemainingWork(activeCommand, clock.millis())
+ }
+
+ /**
* This method is invoked when the resource will idle until the specified [deadline].
*/
public abstract fun onIdle(deadline: Long)
@@ -64,27 +79,18 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
* Compute the speed at which the resource may be consumed.
*/
protected open fun getSpeed(limit: Double): Double {
- return min(limit, getCapacity())
- }
-
- /**
- * Return the capacity available for the resource consumer.
- */
- protected open fun getCapacity(): Double {
- return resource.capacity
+ return min(limit, capacity)
}
/**
- * Get the remaining work to process after a resource consumption was flushed.
+ * Get the remaining work to process after a resource consumption.
*
* @param work The size of the resource consumption.
* @param speed The speed of consumption.
* @param duration The duration from the start of the consumption until now.
- * @param isInterrupted A flag to indicate that the resource consumption could not be fully processed due to
- * it being interrupted before it could finish or reach its deadline.
* @return The amount of work remaining.
*/
- protected open fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
+ protected open fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
return if (duration > 0L) {
val processed = duration / 1000.0 * speed
max(0.0, work - processed)
@@ -99,13 +105,15 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
public fun start() {
check(state == SimResourceState.Pending) { "Consumer is already started" }
+ val now = clock.millis()
+
state = SimResourceState.Active
isProcessing = true
- latestFlush = clock.millis()
+ latestFlush = now
try {
consumer.onStart(this)
- interpret(consumer.onNext(this, getCapacity(), 0.0))
+ activeCommand = interpret(consumer.onNext(this), now)
} catch (cause: Throwable) {
doStop(cause)
} finally {
@@ -154,36 +162,34 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
val activeCommand = activeCommand ?: return
val (timestamp, command) = activeCommand
+ // Note: accessor is reliant on activeCommand being set
+ val remainingWork = remainingWork
+
isProcessing = true
- this.activeCommand = null
val duration = now - timestamp
assert(duration >= 0) { "Flush in the past" }
- when (command) {
+ this.activeCommand = when (command) {
is SimResourceCommand.Idle -> {
// We should only continue processing the next command if:
// 1. The resource consumer reached its deadline.
// 2. The resource consumer should be interrupted (e.g., someone called .interrupt())
if (command.deadline <= now || !isIntermediate) {
- next(remainingWork = 0.0)
+ next(now)
} else {
- this.activeCommand = activeCommand
+ activeCommand
}
}
is SimResourceCommand.Consume -> {
- val speed = min(resource.capacity, command.limit)
- val isInterrupted = !isIntermediate && duration < getDuration(command.work, speed)
- val remainingWork = getRemainingWork(command.work, speed, duration, isInterrupted)
-
// We should only continue processing the next command if:
// 1. The resource consumption was finished.
// 2. The resource consumer reached its deadline.
// 3. The resource consumer should be interrupted (e.g., someone called .interrupt())
if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) {
- next(remainingWork)
+ next(now)
} else {
- interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline))
+ interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline), now)
}
}
SimResourceCommand.Exit ->
@@ -238,6 +244,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
this.state = SimResourceState.Stopped
if (state == SimResourceState.Active) {
+ activeCommand = null
onFinish(cause)
}
}
@@ -245,9 +252,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
/**
* Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
*/
- private fun interpret(command: SimResourceCommand) {
- val now = clock.millis()
-
+ private fun interpret(command: SimResourceCommand, now: Long): CommandWrapper? {
when (command) {
is SimResourceCommand.Idle -> {
val deadline = command.deadline
@@ -265,18 +270,34 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
onConsume(work, limit, deadline)
}
- is SimResourceCommand.Exit -> doStop(null)
+ is SimResourceCommand.Exit -> {
+ doStop(null)
+ // No need to set the next active command
+ return null
+ }
}
- assert(activeCommand == null) { "Concurrent access to current command" }
- activeCommand = CommandWrapper(now, command)
+ return CommandWrapper(now, command)
}
/**
* Request the workload for more work.
*/
- private fun next(remainingWork: Double) {
- interpret(consumer.onNext(this, getCapacity(), remainingWork))
+ private fun next(now: Long): CommandWrapper? = interpret(consumer.onNext(this), now)
+
+ /**
+ * Compute the remaining work based on the specified [wrapper] and [timestamp][now].
+ */
+ private fun computeRemainingWork(wrapper: CommandWrapper, now: Long): Double {
+ val (timestamp, command) = wrapper
+ val duration = now - timestamp
+ return when (command) {
+ is SimResourceCommand.Consume -> {
+ val speed = getSpeed(command.limit)
+ getRemainingWork(command.work, speed, duration)
+ }
+ is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0
+ }
}
/**
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
index 7637ee69..01b56488 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -41,11 +41,9 @@ public interface SimResourceConsumer<in R : SimResource> {
* the resource finished processing, reached its deadline or was interrupted.
*
* @param ctx The execution context in which the consumer runs.
- * @param capacity The capacity that is available for the consumer. In case no capacity is available, zero is given.
- * @param remainingWork The work of the previous command that was not yet completed due to interruption or deadline.
* @return The next command that the resource should execute.
*/
- public fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand
+ public fun onNext(ctx: SimResourceContext<R>): SimResourceCommand
/**
* This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit],
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
index dfb5e9ce..f13764fb 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -40,6 +40,16 @@ public interface SimResourceContext<out R : SimResource> {
public val clock: Clock
/**
+ * The resource capacity available at this instant.
+ */
+ public val capacity: Double
+
+ /**
+ * The amount of work still remaining at this instant.
+ */
+ public val remainingWork: Double
+
+ /**
* Ask the resource provider to interrupt its resource.
*/
public fun interrupt()
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
index 23bf8739..732e709a 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
@@ -43,11 +43,6 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) :
private var hasDelegateStarted: Boolean = false
/**
- * The remaining amount of work last cycle.
- */
- private var remainingWork: Double = 0.0
-
- /**
* The state of the forwarder.
*/
override var state: SimResourceState = SimResourceState.Pending
@@ -92,9 +87,8 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) :
this.ctx = ctx
}
- override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand {
val delegate = delegate
- this.remainingWork = remainingWork
if (!hasDelegateStarted) {
start()
@@ -103,7 +97,7 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) :
return if (state == SimResourceState.Stopped) {
SimResourceCommand.Exit
} else if (delegate != null) {
- val command = delegate.onNext(ctx, capacity, remainingWork)
+ val command = delegate.onNext(ctx)
if (command == SimResourceCommand.Exit) {
// Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
// reset beforehand the existing state and check whether it has been updated afterwards
@@ -114,7 +108,7 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) :
if (state == SimResourceState.Stopped)
SimResourceCommand.Exit
else
- onNext(ctx, capacity, 0.0)
+ onNext(ctx)
} else {
command
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index a3b16177..99545c4c 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import org.opendc.utils.TimerScheduler
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index 6b919a77..ee8edfcd 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -57,9 +57,10 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
private val outputContexts: MutableList<OutputContext> = mutableListOf()
/**
- * The total amount of remaining work (of all pCPUs).
+ * The remaining work of all inputs.
*/
- private var totalRemainingWork: Double = 0.0
+ private val totalRemainingWork: Double
+ get() = inputConsumers.sumByDouble { it.remainingWork }
/**
* The total speed requested by the vCPUs.
@@ -241,6 +242,8 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
* Flush the progress of the vCPUs.
*/
private fun flushGuests() {
+ val totalRemainingWork = totalRemainingWork
+
// Flush all the outputs work
for (output in outputContexts) {
output.flush(isIntermediate = true)
@@ -256,7 +259,6 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
totalRequestedSpeed,
totalAllocatedSpeed
)
- totalRemainingWork = 0.0
totalInterferedWork = 0.0
totalOvercommittedWork = 0.0
@@ -362,29 +364,39 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
*/
var actualSpeed: Double = 0.0
+ private fun reportOvercommit() {
+ totalOvercommittedWork += remainingWork
+ }
+
override fun onIdle(deadline: Long) {
+ reportOvercommit()
+
allowedSpeed = 0.0
activeCommand = SimResourceCommand.Idle(deadline)
}
override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ reportOvercommit()
+
allowedSpeed = getSpeed(limit)
activeCommand = SimResourceCommand.Consume(work, limit, deadline)
}
override fun onFinish(cause: Throwable?) {
+ reportOvercommit()
+
activeCommand = SimResourceCommand.Exit
provider.cancel()
super.onFinish(cause)
}
- override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
+ override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
// Apply performance interference model
val performanceScore = 1.0
// Compute the remaining amount of work
- val remainingWork = if (work > 0.0) {
+ return if (work > 0.0) {
// Compute the fraction of compute time allocated to the VM
val fraction = actualSpeed / totalAllocatedSpeed
@@ -400,12 +412,6 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
} else {
0.0
}
-
- if (!isInterrupted) {
- totalOvercommittedWork += remainingWork
- }
-
- return remainingWork
}
override fun interrupt() {
@@ -433,6 +439,12 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
*/
private lateinit var ctx: SimResourceContext<R>
+ /**
+ * The remaining work of this consumer.
+ */
+ val remainingWork: Double
+ get() = ctx.remainingWork
+
init {
barrier = SimConsumerBarrier(barrier.parties + 1)
input.startConsumer(this@InputConsumer)
@@ -449,8 +461,7 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
this.ctx = ctx
}
- override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand {
- totalRemainingWork += remainingWork
+ override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand {
val isLast = barrier.enter()
// Flush the progress of the guest after the barrier has been reached.
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
index a00ee575..0189fe4c 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -39,7 +39,7 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour
iterator = trace.iterator()
}
- override fun onNext(ctx: SimResourceContext<SimResource>, capacity: Double, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext<SimResource>): SimResourceCommand {
val iterator = checkNotNull(iterator)
return if (iterator.hasNext()) {
val now = ctx.clock.millis()
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
index bad2f403..62425583 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
@@ -48,8 +48,8 @@ public class SimWorkConsumer<R : SimResource>(
remainingWork = work
}
- override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand {
- val work = this.remainingWork + remainingWork
+ override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand {
+ val work = this.remainingWork + ctx.remainingWork
this.remainingWork -= work
return if (work > 0.0) {
SimResourceCommand.Consume(work, limit)