summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt24
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt12
-rw-r--r--opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt2
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt7
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt16
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt1
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt84
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt1
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt40
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt160
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt7
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt66
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt3
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt2
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt5
22 files changed, 345 insertions, 114 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 5a6fb03d..45fdb268 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -99,11 +99,12 @@ internal class SimHostTest {
mapOf(
"workload" to SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 3500.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 183.0, 2)
+ SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 2 * 3500.0, 2),
+ SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 2 * 183.0, 2)
),
+ offset = 1
)
)
)
@@ -114,11 +115,12 @@ internal class SimHostTest {
mapOf(
"workload" to SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 3100.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 73.0, 2)
- )
+ SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000L, duration * 1000, 2 * 3100.0, 2),
+ SimTraceWorkload.Fragment(duration * 2000L, duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 3000L, duration * 1000, 2 * 73.0, 2)
+ ),
+ offset = 1
)
)
)
@@ -150,7 +152,7 @@ internal class SimHostTest {
override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
},
- exportInterval = duration * 1000
+ exportInterval = duration * 1000L
)
coroutineScope {
@@ -171,7 +173,7 @@ internal class SimHostTest {
}
// Ensure last cycle is collected
- delay(1000 * duration)
+ delay(1000L * duration)
virtDriver.close()
reader.close()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 2c443678..fa9fa2fc 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -258,7 +258,7 @@ suspend fun processTrace(
delay(max(0, (entry.start - offset) - clock.millis()))
launch {
chan.send(Unit)
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace)
+ val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, offset = -offset + 300001)
val server = client.newServer(
entry.name,
image,
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
index 94193780..16ad6816 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
@@ -50,11 +50,13 @@ class RawParquetTraceReader(private val path: File) {
val record = reader.read() ?: break
val id = record["id"].toString()
+ val time = record["time"] as Long
val duration = record["duration"] as Long
val cores = record["cores"] as Int
val cpuUsage = record["cpuUsage"] as Double
val fragment = SimTraceWorkload.Fragment(
+ time,
duration,
cpuUsage,
cores
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
index a3b45f47..35f4c5b8 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
@@ -81,7 +81,7 @@ class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = e
/**
* A poisonous fragment.
*/
- private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0))
+ private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0))
/**
* The thread to read the records in.
@@ -103,11 +103,13 @@ class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = e
}
val id = record["id"].toString()
+ val time = record["time"] as Long
val duration = record["duration"] as Long
val cores = record["cores"] as Int
val cpuUsage = record["cpuUsage"] as Double
val fragment = SimTraceWorkload.Fragment(
+ time,
duration,
cpuUsage,
cores
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 75428011..393fb88d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -114,9 +114,9 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
{ assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
{ assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(207380204679, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(207371815929, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(8388750, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
+ { assertEquals(155252275351, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
+ { assertEquals(155086837645, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(725049, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
)
}
@@ -151,9 +151,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(96344616902, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(96324879442, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(29454904468, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(29355293349, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(0, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
}
diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt
index 9a93092e..a119a219 100644
--- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt
+++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt
@@ -29,6 +29,6 @@ import org.opendc.simulator.compute.workload.SimWorkload
/**
* A [SimFaaSWorkload] for a [FunctionTrace].
*/
-public class FunctionTraceWorkload(trace: FunctionTrace) : SimFaaSWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.duration, it.cpuUsage, 1) }) {
+public class FunctionTraceWorkload(trace: FunctionTrace) : SimFaaSWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.timestamp, it.duration, it.cpuUsage, 1) }) {
override suspend fun invoke() {}
}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
index aaf8a240..cd8021fe 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
@@ -85,17 +85,19 @@ public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkloa
}
vmId = vmFile.nameWithoutExtension.trim().toLong()
- startTime = min(startTime, values[timestampCol].trim().toLong() - 5 * 60)
+ val timestamp = values[timestampCol].trim().toLong() - 5 * 60
+ startTime = min(startTime, timestamp)
cores = values[coreCol].trim().toInt()
val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong()
if (flopsHistory.isEmpty()) {
- flopsHistory.add(SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores))
+ flopsHistory.add(SimTraceWorkload.Fragment(timestamp, traceInterval, cpuUsage, cores))
} else {
if (flopsHistory.last().usage != cpuUsage) {
flopsHistory.add(
SimTraceWorkload.Fragment(
+ timestamp,
traceInterval,
cpuUsage,
cores
@@ -105,6 +107,7 @@ public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkloa
val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1)
flopsHistory.add(
SimTraceWorkload.Fragment(
+ oldFragment.timestamp,
oldFragment.duration + traceInterval,
cpuUsage,
cores
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
index 50ab652e..bda392a9 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
@@ -108,7 +108,8 @@ public class SwfTraceReader(
for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) {
flopsHistory.add(
SimTraceWorkload.Fragment(
- sliceDuration * 1000L,
+ tick,
+ sliceDuration * 1000,
0.0,
cores
)
@@ -128,6 +129,7 @@ public class SwfTraceReader(
) {
flopsHistory.add(
SimTraceWorkload.Fragment(
+ tick,
sliceDuration * 1000L,
1.0,
cores
@@ -138,6 +140,7 @@ public class SwfTraceReader(
if (runtimePartialSliceRemainder > 0) {
flopsHistory.add(
SimTraceWorkload.Fragment(
+ submitTime + slicedWaitTime + runTime,
sliceDuration,
runtimePartialSliceRemainder / sliceDuration.toDouble(),
cores
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index 8f60bf05..30797089 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -70,14 +70,14 @@ class SimMachineBenchmarks {
@Setup
fun setUp() {
trace = sequenceOf(
- SimTraceWorkload.Fragment(1000, 28.0, 1),
- SimTraceWorkload.Fragment(1000, 3500.0, 1),
- SimTraceWorkload.Fragment(1000, 0.0, 1),
- SimTraceWorkload.Fragment(1000, 183.0, 1),
- SimTraceWorkload.Fragment(1000, 400.0, 1),
- SimTraceWorkload.Fragment(1000, 100.0, 1),
- SimTraceWorkload.Fragment(1000, 3000.0, 1),
- SimTraceWorkload.Fragment(1000, 4500.0, 1),
+ SimTraceWorkload.Fragment(0, 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(1000, 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(2000, 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(3000, 1000, 183.0, 1),
+ SimTraceWorkload.Fragment(4000, 1000, 400.0, 1),
+ SimTraceWorkload.Fragment(5000, 1000, 100.0, 1),
+ SimTraceWorkload.Fragment(6000, 1000, 3000.0, 1),
+ SimTraceWorkload.Fragment(7000, 1000, 4500.0, 1),
)
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 139c66e0..f416643e 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -116,7 +116,6 @@ public abstract class SimAbstractMachine(
// Cancel all cpus on cancellation
cont.invokeOnCancellation {
this.cont = null
-
interpreter.batch {
for (cpu in cpus) {
cpu.cancel()
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 622bcd4d..48be8e1a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -27,25 +27,19 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
-import org.opendc.simulator.resources.consumer.SimConsumerBarrier
/**
* A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource
* consumption for some period of time.
+ *
+ * @param trace The trace of fragments to use.
+ * @param offset The offset for the timestamps.
*/
-public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkload {
- private var offset = Long.MIN_VALUE
+public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val offset: Long = 0L) : SimWorkload {
private val iterator = trace.iterator()
private var fragment: Fragment? = null
- private lateinit var barrier: SimConsumerBarrier
override fun onStart(ctx: SimMachineContext) {
- check(offset == Long.MIN_VALUE) { "Workload does not support re-use" }
-
- barrier = SimConsumerBarrier(ctx.cpus.size)
- fragment = nextFragment()
- offset = ctx.interpreter.clock.millis()
-
val lifecycle = SimWorkloadLifecycle(ctx)
for (cpu in ctx.cpus) {
@@ -56,43 +50,67 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
override fun toString(): String = "SimTraceWorkload"
/**
- * Obtain the next fragment.
+ * Obtain the fragment with a timestamp equal or greater than [now].
*/
- private fun nextFragment(): Fragment? {
- return if (iterator.hasNext()) {
- iterator.next()
- } else {
- null
+ private fun pullFragment(now: Long): Fragment? {
+ var fragment = fragment
+ if (fragment != null && !fragment.isExpired(now)) {
+ return fragment
}
+
+ while (iterator.hasNext()) {
+ fragment = iterator.next()
+ if (!fragment.isExpired(now)) {
+ this.fragment = fragment
+ return fragment
+ }
+ }
+
+ this.fragment = null
+ return null
+ }
+
+ /**
+ * Determine if the specified [Fragment] is expired, i.e., it has already passed.
+ */
+ private fun Fragment.isExpired(now: Long): Boolean {
+ val timestamp = this.timestamp + offset
+ return now >= timestamp + duration
}
private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer {
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
val now = ctx.clock.millis()
- val fragment = fragment ?: return SimResourceCommand.Exit
- val usage = fragment.usage / fragment.cores
- val work = (fragment.duration / 1000) * usage
- val deadline = offset + fragment.duration
+ val fragment = pullFragment(now) ?: return SimResourceCommand.Exit
+ val timestamp = fragment.timestamp + offset
- assert(deadline >= now) { "Deadline already passed" }
-
- val cmd =
- if (cpu.id < fragment.cores && work > 0.0)
- SimResourceCommand.Consume(work, usage, deadline)
- else
- SimResourceCommand.Idle(deadline)
-
- if (barrier.enter()) {
- this@SimTraceWorkload.fragment = nextFragment()
- this@SimTraceWorkload.offset += fragment.duration
+ // Fragment is in the future
+ if (timestamp > now) {
+ return SimResourceCommand.Idle(timestamp)
}
- return cmd
+ val usage = if (fragment.cores > 0)
+ fragment.usage / fragment.cores
+ else
+ 0.0
+ val deadline = timestamp + fragment.duration
+ val duration = deadline - now
+ val work = duration * usage / 1000
+
+ return if (cpu.id < fragment.cores && work > 0.0)
+ SimResourceCommand.Consume(work, usage, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
}
}
/**
* A fragment of the workload.
+ *
+ * @param timestamp The timestamp at which the fragment starts.
+ * @param duration The duration of the fragment.
+ * @param usage The CPU usage during the fragment.
+ * @param cores The amount of cores utilized during the fragment.
*/
- public data class Fragment(val duration: Long, val usage: Double, val cores: Int)
+ public data class Fragment(val timestamp: Long, val duration: Long, val usage: Double, val cores: Int)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index a6d955ca..19808a77 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -44,7 +44,6 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
* Test suite for the [SimBareMetalMachine] class.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
class SimMachineTest {
private lateinit var machineModel: MachineModel
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
index a61cba8d..afc4c949 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
@@ -91,10 +91,10 @@ internal class SimHypervisorTest {
val workloadA =
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
@@ -154,19 +154,19 @@ internal class SimHypervisorTest {
val workloadA =
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
val workloadB =
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 73.0, 1)
+ SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3100.0, 1),
+ SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1)
)
)
@@ -251,19 +251,19 @@ internal class SimHypervisorTest {
val workloadA =
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
val workloadB =
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 73.0, 1)
+ SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3100.0, 1),
+ SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1)
)
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index 7c77b283..80496992 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -71,10 +71,10 @@ internal class SimSpaceSharedHypervisorTest {
val workloadA =
SimTraceWorkload(
sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
new file mode 100644
index 00000000..78019c2e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
@@ -0,0 +1,160 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload
+
+import kotlinx.coroutines.delay
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.compute.SimBareMetalMachine
+import org.opendc.simulator.compute.model.*
+import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.SimResourceInterpreter
+
+/**
+ * Test suite for the [SimTraceWorkloadTest] class.
+ */
+class SimTraceWorkloadTest {
+ private lateinit var machineModel: MachineModel
+
+ @BeforeEach
+ fun setUp() {
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+
+ machineModel = MachineModel(
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ )
+ }
+
+ @Test
+ fun testSmoke() = runBlockingSimulation {
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
+
+ val workload = SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
+ SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceWorkload.Fragment(2000, 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ ),
+ offset = 0
+ )
+
+ try {
+ machine.run(workload)
+
+ assertEquals(4000, clock.millis())
+ } finally {
+ machine.close()
+ }
+ }
+
+ @Test
+ fun testOffset() = runBlockingSimulation {
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
+
+ val workload = SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
+ SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceWorkload.Fragment(2000, 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ ),
+ offset = 1000
+ )
+
+ try {
+ machine.run(workload)
+
+ assertEquals(5000, clock.millis())
+ } finally {
+ machine.close()
+ }
+ }
+
+ @Test
+ fun testSkipFragment() = runBlockingSimulation {
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
+
+ val workload = SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
+ SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceWorkload.Fragment(2000, 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ ),
+ offset = 0
+ )
+
+ try {
+ delay(1000L)
+ machine.run(workload)
+
+ assertEquals(4000, clock.millis())
+ } finally {
+ machine.close()
+ }
+ }
+
+ @Test
+ fun testZeroCores() = runBlockingSimulation {
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
+
+ val workload = SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
+ SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceWorkload.Fragment(2000, 1000, 0.0, 0),
+ SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ ),
+ offset = 0
+ )
+
+ try {
+ machine.run(workload)
+
+ assertEquals(4000, clock.millis())
+ } finally {
+ machine.close()
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index 8a24b3e7..00648876 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -112,8 +112,8 @@ public abstract class SimAbstractResourceAggregator(
doFinish()
}
- override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
- updateCounters(ctx, work)
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {
+ updateCounters(ctx, work, willOvercommit)
}
override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
index 860c50ee..4e8e803a 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
@@ -87,7 +87,7 @@ public abstract class SimAbstractResourceProvider(
/**
* Update the counters of the resource provider.
*/
- protected fun updateCounters(ctx: SimResourceContext, work: Double) {
+ protected fun updateCounters(ctx: SimResourceContext, work: Double, willOvercommit: Boolean) {
if (work <= 0.0) {
return
}
@@ -96,7 +96,10 @@ public abstract class SimAbstractResourceProvider(
val remainingWork = ctx.remainingWork
counters.demand += work
counters.actual += work - remainingWork
- counters.overcommit += remainingWork
+
+ if (willOvercommit && remainingWork > 0.0) {
+ counters.overcommit += remainingWork
+ }
}
final override fun startConsumer(consumer: SimResourceConsumer) {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index 398797cf..a985986d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -22,6 +22,7 @@
package org.opendc.simulator.resources
+import org.opendc.simulator.resources.impl.SimResourceCountersImpl
import org.opendc.simulator.resources.interference.InterferenceDomain
import org.opendc.simulator.resources.interference.InterferenceKey
import kotlin.math.min
@@ -53,9 +54,9 @@ public class SimResourceDistributorMaxMin(
private val activeOutputs: MutableList<Output> = mutableListOf()
/**
- * The total amount of work requested by the output resources.
+ * The total amount of work allocated to be executed.
*/
- private var totalRequestedWork = 0.0
+ private var totalAllocatedWork = 0.0
/**
* The total allocated speed for the output resources.
@@ -67,6 +68,13 @@ public class SimResourceDistributorMaxMin(
*/
private var totalRequestedSpeed = 0.0
+ /**
+ * The resource counters of this distributor.
+ */
+ public val counters: SimResourceCounters
+ get() = _counters
+ private val _counters = SimResourceCountersImpl()
+
/* SimResourceDistributor */
override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider {
val provider = Output(ctx?.capacity ?: 0.0, key)
@@ -103,6 +111,25 @@ public class SimResourceDistributorMaxMin(
}
/**
+ * Update the counters of the distributor.
+ */
+ private fun updateCounters(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {
+ if (work <= 0.0) {
+ return
+ }
+
+ val counters = _counters
+ val remainingWork = ctx.remainingWork
+
+ counters.demand += work
+ counters.actual += work - remainingWork
+
+ if (willOvercommit && remainingWork > 0.0) {
+ counters.overcommit += remainingWork
+ }
+ }
+
+ /**
* Schedule the work of the outputs.
*/
private fun doNext(ctx: SimResourceContext): SimResourceCommand {
@@ -116,7 +143,6 @@ public class SimResourceDistributorMaxMin(
var deadline: Long = Long.MAX_VALUE
var availableSpeed = capacity
var totalRequestedSpeed = 0.0
- var totalRequestedWork = 0.0
// Pull in the work of the outputs
val outputIterator = activeOutputs.listIterator()
@@ -138,6 +164,7 @@ public class SimResourceDistributorMaxMin(
for (output in activeOutputs) {
val availableShare = availableSpeed / remaining--
val grantedSpeed = min(output.allowedSpeed, availableShare)
+
deadline = min(deadline, output.deadline)
// Ignore idle computation
@@ -147,7 +174,6 @@ public class SimResourceDistributorMaxMin(
}
totalRequestedSpeed += output.limit
- totalRequestedWork += output.work
output.actualSpeed = grantedSpeed
availableSpeed -= grantedSpeed
@@ -156,18 +182,28 @@ public class SimResourceDistributorMaxMin(
duration = min(duration, output.work / grantedSpeed)
}
+ val targetDuration = min(duration, (deadline - interpreter.clock.millis()) / 1000.0)
+ var totalRequestedWork = 0.0
+ var totalAllocatedWork = 0.0
+ for (output in activeOutputs) {
+ val work = output.work
+ val speed = output.actualSpeed
+ if (speed > 0.0) {
+ val outputDuration = work / speed
+ totalRequestedWork += work * (duration / outputDuration)
+ totalAllocatedWork += work * (targetDuration / outputDuration)
+ }
+ }
+
assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" }
- this.totalRequestedWork = totalRequestedWork
this.totalRequestedSpeed = totalRequestedSpeed
- this.totalAllocatedSpeed = capacity - availableSpeed
- val totalAllocatedWork = min(
- totalRequestedWork,
- totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration)
- )
+ this.totalAllocatedWork = totalAllocatedWork
+ val totalAllocatedSpeed = capacity - availableSpeed
+ this.totalAllocatedSpeed = totalAllocatedSpeed
return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0)
- SimResourceCommand.Consume(totalRequestedWork, totalAllocatedSpeed, deadline)
+ SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline)
else
SimResourceCommand.Idle(deadline)
}
@@ -262,8 +298,10 @@ public class SimResourceDistributorMaxMin(
return Long.MAX_VALUE
}
- override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
- updateCounters(ctx, work)
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {
+ updateCounters(ctx, work, willOvercommit)
+
+ this@SimResourceDistributorMaxMin.updateCounters(ctx, work, willOvercommit)
}
override fun onFinish(ctx: SimResourceControllableContext) {
@@ -288,7 +326,7 @@ public class SimResourceDistributorMaxMin(
}
// Compute the work that was actually granted to the output.
- return (totalRequestedWork - totalRemainingWork) * fraction * perfScore
+ return (totalAllocatedWork - totalRemainingWork) * fraction * perfScore
}
/* Comparable */
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
index 17045557..2fe1b00f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
@@ -52,8 +52,9 @@ public interface SimResourceProviderLogic {
*
* @param ctx The context in which the provider runs.
* @param work The amount of work that was requested by the resource consumer.
+ * @param willOvercommit A flag to indicate that the remaining work is overcommitted.
*/
- public fun onUpdate(ctx: SimResourceControllableContext, work: Double) {}
+ public fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {}
/**
* This method is invoked when the resource consumer has finished.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 2f70e3cc..2d53198a 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -51,8 +51,8 @@ public class SimResourceSource(
}
}
- override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
- updateCounters(ctx, work)
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {
+ updateCounters(ctx, work, willOvercommit)
}
override fun onFinish(ctx: SimResourceControllableContext) {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index ceb5a1a4..d988b70d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -54,7 +54,7 @@ public class SimResourceSwitchMaxMin(
* The resource counters to track the execution metrics of all switch resources.
*/
override val counters: SimResourceCounters
- get() = aggregator.counters
+ get() = distributor.counters
/**
* A flag to indicate that the switch was closed.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
index 98fad068..b79998a3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -202,17 +202,18 @@ internal class SimResourceContextImpl(
val isInterrupted = _flag and FLAG_INTERRUPT != 0
val remainingWork = getRemainingWork(timestamp)
val isConsume = _limit > 0.0
+ val reachedDeadline = _deadline <= timestamp
// Update the resource counters only if there is some progress
if (timestamp > _timestamp) {
- logic.onUpdate(this, _work)
+ logic.onUpdate(this, _work, reachedDeadline)
}
// We should only continue processing the next command if:
// 1. The resource consumption was finished.
// 2. The resource capacity cannot satisfy the demand.
// 3. The resource consumer should be interrupted (e.g., someone called .interrupt())
- if ((isConsume && remainingWork == 0.0) || _deadline <= timestamp || isInterrupted) {
+ if ((isConsume && remainingWork == 0.0) || reachedDeadline || isInterrupted) {
when (val command = consumer.onNext(this)) {
is SimResourceCommand.Idle -> interpretIdle(timestamp, command.deadline)
is SimResourceCommand.Consume -> interpretConsume(timestamp, command.work, command.limit, command.deadline)