diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-08-17 19:25:11 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-08-24 11:20:17 +0200 |
| commit | 31a1f298c71cd3203fdcd57bd39ba8813009dd5b (patch) | |
| tree | 02cc92ae680d05c766388c96f538faa14bc6fe83 /opendc-simulator/opendc-simulator-compute/src/main | |
| parent | f03129779a1ec60e8689ad9c7fd5ad488c66f54c (diff) | |
refactor(simulator): Execute traces based on timestamps
This change refactors the trace workload in the OpenDC simulator to
track execute a fragment based on the fragment's timestamp. This makes
sure that the trace is replayed identically to the original execution.
Diffstat (limited to 'opendc-simulator/opendc-simulator-compute/src/main')
2 files changed, 43 insertions, 34 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 139c66e0..f416643e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -116,7 +116,6 @@ public abstract class SimAbstractMachine( // Cancel all cpus on cancellation cont.invokeOnCancellation { this.cont = null - interpreter.batch { for (cpu in cpus) { cpu.cancel() diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 622bcd4d..fc49f357 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -27,25 +27,19 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext -import org.opendc.simulator.resources.consumer.SimConsumerBarrier /** * A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource * consumption for some period of time. + * + * @param trace The trace of fragments to use. + * @param offset The offset for the timestamps. */ -public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkload { - private var offset = Long.MIN_VALUE +public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val offset: Long = 0L) : SimWorkload { private val iterator = trace.iterator() private var fragment: Fragment? = null - private lateinit var barrier: SimConsumerBarrier override fun onStart(ctx: SimMachineContext) { - check(offset == Long.MIN_VALUE) { "Workload does not support re-use" } - - barrier = SimConsumerBarrier(ctx.cpus.size) - fragment = nextFragment() - offset = ctx.interpreter.clock.millis() - val lifecycle = SimWorkloadLifecycle(ctx) for (cpu in ctx.cpus) { @@ -56,43 +50,59 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa override fun toString(): String = "SimTraceWorkload" /** - * Obtain the next fragment. + * Obtain the fragment with a timestamp equal or greater than [now]. */ - private fun nextFragment(): Fragment? { - return if (iterator.hasNext()) { - iterator.next() - } else { - null + private fun pullFragment(now: Long): Fragment? { + var fragment = fragment + if (fragment != null && !fragment.isExpired(now)) { + return fragment + } + + while (iterator.hasNext()) { + fragment = iterator.next() + if (!fragment.isExpired(now)) { + this.fragment = fragment + return fragment + } } + + this.fragment = null + return null + } + + /** + * Determine if the specified [Fragment] is expired, i.e., it has already passed. + */ + private fun Fragment.isExpired(now: Long): Boolean { + val timestamp = this.timestamp + offset + return now >= timestamp + duration } private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer { override fun onNext(ctx: SimResourceContext): SimResourceCommand { val now = ctx.clock.millis() - val fragment = fragment ?: return SimResourceCommand.Exit - val usage = fragment.usage / fragment.cores - val work = (fragment.duration / 1000) * usage - val deadline = offset + fragment.duration + val fragment = pullFragment(now) ?: return SimResourceCommand.Exit + val timestamp = fragment.timestamp + offset - assert(deadline >= now) { "Deadline already passed" } - - val cmd = - if (cpu.id < fragment.cores && work > 0.0) - SimResourceCommand.Consume(work, usage, deadline) - else - SimResourceCommand.Idle(deadline) - - if (barrier.enter()) { - this@SimTraceWorkload.fragment = nextFragment() - this@SimTraceWorkload.offset += fragment.duration + // Fragment is in the future + if (timestamp > now) { + return SimResourceCommand.Idle(timestamp) } - return cmd + val usage = fragment.usage / fragment.cores + val deadline = timestamp + fragment.duration + val duration = deadline - now + val work = duration * usage / 1000 + + return if (cpu.id < fragment.cores && work > 0.0) + SimResourceCommand.Consume(work, usage, deadline) + else + SimResourceCommand.Idle(deadline) } } /** * A fragment of the workload. */ - public data class Fragment(val duration: Long, val usage: Double, val cores: Int) + public data class Fragment(val timestamp: Long, val duration: Long, val usage: Double, val cores: Int) } |
