summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-compute/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-17 19:25:11 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-24 11:20:17 +0200
commit31a1f298c71cd3203fdcd57bd39ba8813009dd5b (patch)
tree02cc92ae680d05c766388c96f538faa14bc6fe83 /opendc-simulator/opendc-simulator-compute/src/main
parentf03129779a1ec60e8689ad9c7fd5ad488c66f54c (diff)
refactor(simulator): Execute traces based on timestamps
This change refactors the trace workload in the OpenDC simulator to track execute a fragment based on the fragment's timestamp. This makes sure that the trace is replayed identically to the original execution.
Diffstat (limited to 'opendc-simulator/opendc-simulator-compute/src/main')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt1
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt76
2 files changed, 43 insertions, 34 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 139c66e0..f416643e 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -116,7 +116,6 @@ public abstract class SimAbstractMachine(
// Cancel all cpus on cancellation
cont.invokeOnCancellation {
this.cont = null
-
interpreter.batch {
for (cpu in cpus) {
cpu.cancel()
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 622bcd4d..fc49f357 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -27,25 +27,19 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
-import org.opendc.simulator.resources.consumer.SimConsumerBarrier
/**
* A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource
* consumption for some period of time.
+ *
+ * @param trace The trace of fragments to use.
+ * @param offset The offset for the timestamps.
*/
-public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkload {
- private var offset = Long.MIN_VALUE
+public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val offset: Long = 0L) : SimWorkload {
private val iterator = trace.iterator()
private var fragment: Fragment? = null
- private lateinit var barrier: SimConsumerBarrier
override fun onStart(ctx: SimMachineContext) {
- check(offset == Long.MIN_VALUE) { "Workload does not support re-use" }
-
- barrier = SimConsumerBarrier(ctx.cpus.size)
- fragment = nextFragment()
- offset = ctx.interpreter.clock.millis()
-
val lifecycle = SimWorkloadLifecycle(ctx)
for (cpu in ctx.cpus) {
@@ -56,43 +50,59 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
override fun toString(): String = "SimTraceWorkload"
/**
- * Obtain the next fragment.
+ * Obtain the fragment with a timestamp equal or greater than [now].
*/
- private fun nextFragment(): Fragment? {
- return if (iterator.hasNext()) {
- iterator.next()
- } else {
- null
+ private fun pullFragment(now: Long): Fragment? {
+ var fragment = fragment
+ if (fragment != null && !fragment.isExpired(now)) {
+ return fragment
+ }
+
+ while (iterator.hasNext()) {
+ fragment = iterator.next()
+ if (!fragment.isExpired(now)) {
+ this.fragment = fragment
+ return fragment
+ }
}
+
+ this.fragment = null
+ return null
+ }
+
+ /**
+ * Determine if the specified [Fragment] is expired, i.e., it has already passed.
+ */
+ private fun Fragment.isExpired(now: Long): Boolean {
+ val timestamp = this.timestamp + offset
+ return now >= timestamp + duration
}
private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer {
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
val now = ctx.clock.millis()
- val fragment = fragment ?: return SimResourceCommand.Exit
- val usage = fragment.usage / fragment.cores
- val work = (fragment.duration / 1000) * usage
- val deadline = offset + fragment.duration
+ val fragment = pullFragment(now) ?: return SimResourceCommand.Exit
+ val timestamp = fragment.timestamp + offset
- assert(deadline >= now) { "Deadline already passed" }
-
- val cmd =
- if (cpu.id < fragment.cores && work > 0.0)
- SimResourceCommand.Consume(work, usage, deadline)
- else
- SimResourceCommand.Idle(deadline)
-
- if (barrier.enter()) {
- this@SimTraceWorkload.fragment = nextFragment()
- this@SimTraceWorkload.offset += fragment.duration
+ // Fragment is in the future
+ if (timestamp > now) {
+ return SimResourceCommand.Idle(timestamp)
}
- return cmd
+ val usage = fragment.usage / fragment.cores
+ val deadline = timestamp + fragment.duration
+ val duration = deadline - now
+ val work = duration * usage / 1000
+
+ return if (cpu.id < fragment.cores && work > 0.0)
+ SimResourceCommand.Consume(work, usage, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
}
}
/**
* A fragment of the workload.
*/
- public data class Fragment(val duration: Long, val usage: Double, val cores: Int)
+ public data class Fragment(val timestamp: Long, val duration: Long, val usage: Double, val cores: Int)
}