summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-24 11:33:47 +0200
committerGitHub <noreply@github.com>2021-08-24 11:33:47 +0200
commit51515bb255b3b32ca3020419a0c84130a4d8d370 (patch)
tree9c16c18af909b8e89ae6fd76cd7365eb46e0712c
parentf03129779a1ec60e8689ad9c7fd5ad488c66f54c (diff)
parentc46ff4c5cc18ba8a82ee0135f087c4d7aed1e804 (diff)
merge: Execute trace fragments based on timestamps
This pull request updates the simulator to execute workload traces based on the fragment's timestamps. This means that traces will execute their timestamps at the correct time. * Execute traces based on timestamps * Record overcommit only after deadline exceeded * Support trace fragments with zero cores available * Support unaligned trace fragments **Breaking API Changes** * `SimTraceWorkload.Fragment` now requires a `timestamp` parameter.
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt24
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt12
-rw-r--r--opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt2
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt7
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt16
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt1
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt84
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt1
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt40
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt160
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt7
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt66
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt3
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt2
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt5
22 files changed, 345 insertions, 114 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 5a6fb03d..45fdb268 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -99,11 +99,12 @@ internal class SimHostTest {
mapOf(
"workload" to SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 3500.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 183.0, 2)
+ SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 2 * 3500.0, 2),
+ SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 2 * 183.0, 2)
),
+ offset = 1
)
)
)
@@ -114,11 +115,12 @@ internal class SimHostTest {
mapOf(
"workload" to SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 3100.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 73.0, 2)
- )
+ SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000L, duration * 1000, 2 * 3100.0, 2),
+ SimTraceWorkload.Fragment(duration * 2000L, duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 3000L, duration * 1000, 2 * 73.0, 2)
+ ),
+ offset = 1
)
)
)
@@ -150,7 +152,7 @@ internal class SimHostTest {
override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
},
- exportInterval = duration * 1000
+ exportInterval = duration * 1000L
)
coroutineScope {
@@ -171,7 +173,7 @@ internal class SimHostTest {
}
// Ensure last cycle is collected
- delay(1000 * duration)
+ delay(1000L * duration)
virtDriver.close()
reader.close()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 2c443678..fa9fa2fc 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -258,7 +258,7 @@ suspend fun processTrace(
delay(max(0, (entry.start - offset) - clock.millis()))
launch {
chan.send(Unit)
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace)
+ val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, offset = -offset + 300001)
val server = client.newServer(
entry.name,
image,
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
index 94193780..16ad6816 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
@@ -50,11 +50,13 @@ class RawParquetTraceReader(private val path: File) {
val record = reader.read() ?: break
val id = record["id"].toString()
+ val time = record["time"] as Long
val duration = record["duration"] as Long
val cores = record["cores"] as Int
val cpuUsage = record["cpuUsage"] as Double
val fragment = SimTraceWorkload.Fragment(
+ time,
duration,
cpuUsage,
cores
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
index a3b45f47..35f4c5b8 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
@@ -81,7 +81,7 @@ class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = e
/**
* A poisonous fragment.
*/
- private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0))
+ private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0))
/**
* The thread to read the records in.
@@ -103,11 +103,13 @@ class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = e
}
val id = record["id"].toString()
+ val time = record["time"] as Long
val duration = record["duration"] as Long
val cores = record["cores"] as Int
val cpuUsage = record["cpuUsage"] as Double
val fragment = SimTraceWorkload.Fragment(
+ time,
duration,
cpuUsage,
cores
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 75428011..393fb88d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -114,9 +114,9 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
{ assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
{ assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(207380204679, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(207371815929, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(8388750, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
+ { assertEquals(155252275351, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
+ { assertEquals(155086837645, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(725049, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
)
}
@@ -151,9 +151,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(96344616902, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(96324879442, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(29454904468, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(29355293349, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(0, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
}
diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt
index 9a93092e..a119a219 100644
--- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt
+++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt
@@ -29,6 +29,6 @@ import org.opendc.simulator.compute.workload.SimWorkload
/**
* A [SimFaaSWorkload] for a [FunctionTrace].
*/
-public class FunctionTraceWorkload(trace: FunctionTrace) : SimFaaSWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.duration, it.cpuUsage, 1) }) {
+public class FunctionTraceWorkload(trace: FunctionTrace) : SimFaaSWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.timestamp, it.duration, it.cpuUsage, 1) }) {
override suspend fun invoke() {}
}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
index aaf8a240..cd8021fe 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
@@ -85,17 +85,19 @@ public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkloa
}
vmId = vmFile.nameWithoutExtension.trim().toLong()
- startTime = min(startTime, values[timestampCol].trim().toLong() - 5 * 60)
+ val timestamp = values[timestampCol].trim().toLong() - 5 * 60
+ startTime = min(startTime, timestamp)
cores = values[coreCol].trim().toInt()
val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong()
if (flopsHistory.isEmpty()) {
- flopsHistory.add(SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores))
+ flopsHistory.add(SimTraceWorkload.Fragment(timestamp, traceInterval, cpuUsage, cores))
} else {
if (flopsHistory.last().usage != cpuUsage) {
flopsHistory.add(
SimTraceWorkload.Fragment(
+ timestamp,
traceInterval,
cpuUsage,
cores
@@ -105,6 +107,7 @@ public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkloa
val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1)
flopsHistory.add(
SimTraceWorkload.Fragment(
+ oldFragment.timestamp,
oldFragment.duration + traceInterval,
cpuUsage,
cores
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
index 50ab652e..bda392a9 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
@@ -108,7 +108,8 @@ public class SwfTraceReader(
for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) {
flopsHistory.add(
SimTraceWorkload.Fragment(
- sliceDuration * 1000L,
+ tick,
+ sliceDuration * 1000,
0.0,
cores
)
@@ -128,6 +129,7 @@ public class SwfTraceReader(
) {
flopsHistory.add(
SimTraceWorkload.Fragment(
+ tick,
sliceDuration * 1000L,
1.0,
cores
@@ -138,6 +140,7 @@ public class SwfTraceReader(
if (runtimePartialSliceRemainder > 0) {
flopsHistory.add(
SimTraceWorkload.Fragment(
+ submitTime + slicedWaitTime + runTime,
sliceDuration,
runtimePartialSliceRemainder / sliceDuration.toDouble(),
cores
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index 8f60bf05..30797089 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -70,14 +70,14 @@ class SimMachineBenchmarks {
@Setup
fun setUp() {
trace = sequenceOf(
- SimTraceWorkload.Fragment(1000, 28.0, 1),
- SimTraceWorkload.Fragment(1000, 3500.0, 1),
- SimTraceWorkload.Fragment(1000, 0.0, 1),
- SimTraceWorkload.Fragment(1000, 183.0, 1),
- SimTraceWorkload.Fragment(1000, 400.0, 1),
- SimTraceWorkload.Fragment(1000, 100.0, 1),
- SimTraceWorkload.Fragment(1000, 3000.0, 1),
- SimTraceWorkload.Fragment(1000, 4500.0, 1),
+ SimTraceWorkload.Fragment(0, 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(1000, 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(2000, 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(3000, 1000, 183.0, 1),
+ SimTraceWorkload.Fragment(4000, 1000, 400.0, 1),
+ SimTraceWorkload.Fragment(5000, 1000, 100.0, 1),
+ SimTraceWorkload.Fragment(6000, 1000, 3000.0, 1),
+ SimTraceWorkload.Fragment(7000, 1000, 4500.0, 1),
)
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 139c66e0..f416643e 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -116,7 +116,6 @@ public abstract class SimAbstractMachine(
// Cancel all cpus on cancellation
cont.invokeOnCancellation {
this.cont = null
-
interpreter.batch {
for (cpu in cpus) {
cpu.cancel()
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 622bcd4d..48be8e1a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -27,25 +27,19 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
-import org.opendc.simulator.resources.consumer.SimConsumerBarrier
/**
* A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource
* consumption for some period of time.
+ *
+ * @param trace The trace of fragments to use.
+ * @param offset The offset for the timestamps.
*/
-public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkload {
- private var offset = Long.MIN_VALUE
+public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val offset: Long = 0L) : SimWorkload {
private val iterator = trace.iterator()
private var fragment: Fragment? = null
- private lateinit var barrier: SimConsumerBarrier
override fun onStart(ctx: SimMachineContext) {
- check(offset == Long.MIN_VALUE) { "Workload does not support re-use" }
-
- barrier = SimConsumerBarrier(ctx.cpus.size)
- fragment = nextFragment()
- offset = ctx.interpreter.clock.millis()
-
val lifecycle = SimWorkloadLifecycle(ctx)
for (cpu in ctx.cpus) {
@@ -56,43 +50,67 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
override fun toString(): String = "SimTraceWorkload"
/**
- * Obtain the next fragment.
+ * Obtain the fragment with a timestamp equal or greater than [now].
*/
- private fun nextFragment(): Fragment? {
- return if (iterator.hasNext()) {
- iterator.next()
- } else {
- null
+ private fun pullFragment(now: Long): Fragment? {
+ var fragment = fragment
+ if (fragment != null && !fragment.isExpired(now)) {
+ return fragment
}
+
+ while (iterator.hasNext()) {
+ fragment = iterator.next()
+ if (!fragment.isExpired(now)) {
+ this.fragment = fragment
+ return fragment
+ }
+ }
+
+ this.fragment = null
+ return null
+ }
+
+ /**
+ * Determine if the specified [Fragment] is expired, i.e., it has already passed.
+ */
+ private fun Fragment.isExpired(now: Long): Boolean {
+ val timestamp = this.timestamp + offset
+ return now >= timestamp + duration
}
private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer {
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
val now = ctx.clock.millis()
- val fragment = fragment ?: return SimResourceCommand.Exit
- val usage = fragment.usage / fragment.cores
- val work = (fragment.duration / 1000) * usage
- val deadline = offset + fragment.duration
+ val fragment = pullFragment(now) ?: return SimResourceCommand.Exit
+ val timestamp = fragment.timestamp + offset
- assert(deadline >= now) { "Deadline already passed" }
-
- val cmd =
- if (cpu.id < fragment.cores && work > 0.0)
- SimResourceCommand.Consume(work, usage, deadline)
- else
- SimResourceCommand.Idle(deadline)
-
- if (barrier.enter()) {
- this@SimTraceWorkload.fragment = nextFragment()
- this@SimTraceWorkload.offset += fragment.duration
+ // Fragment is in the future
+ if (timestamp > now) {
+ return SimResourceCommand.Idle(timestamp)
}
- return cmd
+ val usage = if (fragment.cores > 0)
+ fragment.usage / fragment.cores
+ else
+ 0.0
+ val deadline = timestamp + fragment.duration
+ val duration = deadline - now
+ val work = duration * usage / 1000
+
+ return if (cpu.id < fragment.cores && work > 0.0)
+ SimResourceCommand.Consume(work, usage, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
}
}
/**
* A fragment of the workload.
+ *
+ * @param timestamp The timestamp at which the fragment starts.
+ * @param duration The duration of the fragment.
+ * @param usage The CPU usage during the fragment.
+ * @param cores The amount of cores utilized during the fragment.
*/
- public data class Fragment(val duration: Long, val usage: Double, val cores: Int)
+ public data class Fragment(val timestamp: Long, val duration: Long, val usage: Double, val cores: Int)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index a6d955ca..19808a77 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -44,7 +44,6 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
* Test suite for the [SimBareMetalMachine] class.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
class SimMachineTest {
private lateinit var machineModel: MachineModel
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
index a61cba8d..afc4c949 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
@@ -91,10 +91,10 @@ internal class SimHypervisorTest {
val workloadA =
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
@@ -154,19 +154,19 @@ internal class SimHypervisorTest {
val workloadA =
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
val workloadB =
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 73.0, 1)
+ SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3100.0, 1),
+ SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1)
)
)
@@ -251,19 +251,19 @@ internal class SimHypervisorTest {
val workloadA =
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
val workloadB =
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 73.0, 1)
+ SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3100.0, 1),
+ SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1)
)
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index 7c77b283..80496992 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -71,10 +71,10 @@ internal class SimSpaceSharedHypervisorTest {
val workloadA =
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
new file mode 100644
index 00000000..78019c2e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
@@ -0,0 +1,160 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload
+
+import kotlinx.coroutines.delay
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.compute.SimBareMetalMachine
+import org.opendc.simulator.compute.model.*
+import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.SimResourceInterpreter
+
+/**
+ * Test suite for the [SimTraceWorkloadTest] class.
+ */
+class SimTraceWorkloadTest {
+ private lateinit var machineModel: MachineModel
+
+ @BeforeEach
+ fun setUp() {
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+
+ machineModel = MachineModel(
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ )
+ }
+
+ @Test
+ fun testSmoke() = runBlockingSimulation {
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
+
+ val workload = SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
+ SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceWorkload.Fragment(2000, 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ ),
+ offset = 0
+ )
+
+ try {
+ machine.run(workload)
+
+ assertEquals(4000, clock.millis())
+ } finally {
+ machine.close()
+ }
+ }
+
+ @Test
+ fun testOffset() = runBlockingSimulation {
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
+
+ val workload = SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
+ SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceWorkload.Fragment(2000, 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ ),
+ offset = 1000
+ )
+
+ try {
+ machine.run(workload)
+
+ assertEquals(5000, clock.millis())
+ } finally {
+ machine.close()
+ }
+ }
+
+ @Test
+ fun testSkipFragment() = runBlockingSimulation {
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
+
+ val workload = SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
+ SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceWorkload.Fragment(2000, 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ ),
+ offset = 0
+ )
+
+ try {
+ delay(1000L)
+ machine.run(workload)
+
+ assertEquals(4000, clock.millis())
+ } finally {
+ machine.close()
+ }
+ }
+
+ @Test
+ fun testZeroCores() = runBlockingSimulation {
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
+
+ val workload = SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
+ SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceWorkload.Fragment(2000, 1000, 0.0, 0),
+ SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ ),
+ offset = 0
+ )
+
+ try {
+ machine.run(workload)
+
+ assertEquals(4000, clock.millis())
+ } finally {
+ machine.close()
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index 8a24b3e7..00648876 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -112,8 +112,8 @@ public abstract class SimAbstractResourceAggregator(
doFinish()
}
- override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
- updateCounters(ctx, work)
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {
+ updateCounters(ctx, work, willOvercommit)
}
override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
index 860c50ee..4e8e803a 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
@@ -87,7 +87,7 @@ public abstract class SimAbstractResourceProvider(
/**
* Update the counters of the resource provider.
*/
- protected fun updateCounters(ctx: SimResourceContext, work: Double) {
+ protected fun updateCounters(ctx: SimResourceContext, work: Double, willOvercommit: Boolean) {
if (work <= 0.0) {
return
}
@@ -96,7 +96,10 @@ public abstract class SimAbstractResourceProvider(
val remainingWork = ctx.remainingWork
counters.demand += work
counters.actual += work - remainingWork
- counters.overcommit += remainingWork
+
+ if (willOvercommit && remainingWork > 0.0) {
+ counters.overcommit += remainingWork
+ }
}
final override fun startConsumer(consumer: SimResourceConsumer) {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index 398797cf..a985986d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -22,6 +22,7 @@
package org.opendc.simulator.resources
+import org.opendc.simulator.resources.impl.SimResourceCountersImpl
import org.opendc.simulator.resources.interference.InterferenceDomain
import org.opendc.simulator.resources.interference.InterferenceKey
import kotlin.math.min
@@ -53,9 +54,9 @@ public class SimResourceDistributorMaxMin(
private val activeOutputs: MutableList<Output> = mutableListOf()
/**
- * The total amount of work requested by the output resources.
+ * The total amount of work allocated to be executed.
*/
- private var totalRequestedWork = 0.0
+ private var totalAllocatedWork = 0.0
/**
* The total allocated speed for the output resources.
@@ -67,6 +68,13 @@ public class SimResourceDistributorMaxMin(
*/
private var totalRequestedSpeed = 0.0
+ /**
+ * The resource counters of this distributor.
+ */
+ public val counters: SimResourceCounters
+ get() = _counters
+ private val _counters = SimResourceCountersImpl()
+
/* SimResourceDistributor */
override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider {
val provider = Output(ctx?.capacity ?: 0.0, key)
@@ -103,6 +111,25 @@ public class SimResourceDistributorMaxMin(
}
/**
+ * Update the counters of the distributor.
+ */
+ private fun updateCounters(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {
+ if (work <= 0.0) {
+ return
+ }
+
+ val counters = _counters
+ val remainingWork = ctx.remainingWork
+
+ counters.demand += work
+ counters.actual += work - remainingWork
+
+ if (willOvercommit && remainingWork > 0.0) {
+ counters.overcommit += remainingWork
+ }
+ }
+
+ /**
* Schedule the work of the outputs.
*/
private fun doNext(ctx: SimResourceContext): SimResourceCommand {
@@ -116,7 +143,6 @@ public class SimResourceDistributorMaxMin(
var deadline: Long = Long.MAX_VALUE
var availableSpeed = capacity
var totalRequestedSpeed = 0.0
- var totalRequestedWork = 0.0
// Pull in the work of the outputs
val outputIterator = activeOutputs.listIterator()
@@ -138,6 +164,7 @@ public class SimResourceDistributorMaxMin(
for (output in activeOutputs) {
val availableShare = availableSpeed / remaining--
val grantedSpeed = min(output.allowedSpeed, availableShare)
+
deadline = min(deadline, output.deadline)
// Ignore idle computation
@@ -147,7 +174,6 @@ public class SimResourceDistributorMaxMin(
}
totalRequestedSpeed += output.limit
- totalRequestedWork += output.work
output.actualSpeed = grantedSpeed
availableSpeed -= grantedSpeed
@@ -156,18 +182,28 @@ public class SimResourceDistributorMaxMin(
duration = min(duration, output.work / grantedSpeed)
}
+ val targetDuration = min(duration, (deadline - interpreter.clock.millis()) / 1000.0)
+ var totalRequestedWork = 0.0
+ var totalAllocatedWork = 0.0
+ for (output in activeOutputs) {
+ val work = output.work
+ val speed = output.actualSpeed
+ if (speed > 0.0) {
+ val outputDuration = work / speed
+ totalRequestedWork += work * (duration / outputDuration)
+ totalAllocatedWork += work * (targetDuration / outputDuration)
+ }
+ }
+
assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" }
- this.totalRequestedWork = totalRequestedWork
this.totalRequestedSpeed = totalRequestedSpeed
- this.totalAllocatedSpeed = capacity - availableSpeed
- val totalAllocatedWork = min(
- totalRequestedWork,
- totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration)
- )
+ this.totalAllocatedWork = totalAllocatedWork
+ val totalAllocatedSpeed = capacity - availableSpeed
+ this.totalAllocatedSpeed = totalAllocatedSpeed
return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0)
- SimResourceCommand.Consume(totalRequestedWork, totalAllocatedSpeed, deadline)
+ SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline)
else
SimResourceCommand.Idle(deadline)
}
@@ -262,8 +298,10 @@ public class SimResourceDistributorMaxMin(
return Long.MAX_VALUE
}
- override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
- updateCounters(ctx, work)
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {
+ updateCounters(ctx, work, willOvercommit)
+
+ this@SimResourceDistributorMaxMin.updateCounters(ctx, work, willOvercommit)
}
override fun onFinish(ctx: SimResourceControllableContext) {
@@ -288,7 +326,7 @@ public class SimResourceDistributorMaxMin(
}
// Compute the work that was actually granted to the output.
- return (totalRequestedWork - totalRemainingWork) * fraction * perfScore
+ return (totalAllocatedWork - totalRemainingWork) * fraction * perfScore
}
/* Comparable */
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
index 17045557..2fe1b00f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
@@ -52,8 +52,9 @@ public interface SimResourceProviderLogic {
*
* @param ctx The context in which the provider runs.
* @param work The amount of work that was requested by the resource consumer.
+ * @param willOvercommit A flag to indicate that the remaining work is overcommitted.
*/
- public fun onUpdate(ctx: SimResourceControllableContext, work: Double) {}
+ public fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {}
/**
* This method is invoked when the resource consumer has finished.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 2f70e3cc..2d53198a 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -51,8 +51,8 @@ public class SimResourceSource(
}
}
- override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
- updateCounters(ctx, work)
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {
+ updateCounters(ctx, work, willOvercommit)
}
override fun onFinish(ctx: SimResourceControllableContext) {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index ceb5a1a4..d988b70d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -54,7 +54,7 @@ public class SimResourceSwitchMaxMin(
* The resource counters to track the execution metrics of all switch resources.
*/
override val counters: SimResourceCounters
- get() = aggregator.counters
+ get() = distributor.counters
/**
* A flag to indicate that the switch was closed.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
index 98fad068..b79998a3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -202,17 +202,18 @@ internal class SimResourceContextImpl(
val isInterrupted = _flag and FLAG_INTERRUPT != 0
val remainingWork = getRemainingWork(timestamp)
val isConsume = _limit > 0.0
+ val reachedDeadline = _deadline <= timestamp
// Update the resource counters only if there is some progress
if (timestamp > _timestamp) {
- logic.onUpdate(this, _work)
+ logic.onUpdate(this, _work, reachedDeadline)
}
// We should only continue processing the next command if:
// 1. The resource consumption was finished.
// 2. The resource capacity cannot satisfy the demand.
// 3. The resource consumer should be interrupted (e.g., someone called .interrupt())
- if ((isConsume && remainingWork == 0.0) || _deadline <= timestamp || isInterrupted) {
+ if ((isConsume && remainingWork == 0.0) || reachedDeadline || isInterrupted) {
when (val command = consumer.onNext(this)) {
is SimResourceCommand.Idle -> interpretIdle(timestamp, command.deadline)
is SimResourceCommand.Consume -> interpretConsume(timestamp, command.work, command.limit, command.deadline)