summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2025-01-07 11:25:48 +0100
committerGitHub <noreply@github.com>2025-01-07 11:25:48 +0100
commitf71e07f55a5176c5bd5447cdb3bcfebf2f5f47ee (patch)
tree9a7e5b11887c560668a17fc2f130bfed7aaceda5
parentc425a03c59e7d5c2e5d82988c61e340a6cbf61fe (diff)
Updated the FlowDistributor (#285)
* Updated the FlowDistributor to work in more cases and be more performant. * Removed old FlowDistributor
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt6
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt10
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt147
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_5000_2000.json23
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java50
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java18
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java12
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java)8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java160
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java26
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java4
19 files changed, 338 insertions, 198 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
index f34e135d..cc4ac2d8 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
@@ -62,7 +62,7 @@ public class SimHost(
private val graph: FlowGraph,
private val machineModel: MachineModel,
private val cpuPowerModel: CpuPowerModel,
- private val powerMux: FlowDistributor,
+ private val powerDistributor: FlowDistributor,
) : AutoCloseable {
/**
* The event listeners registered with this host.
@@ -131,7 +131,7 @@ public class SimHost(
SimMachine(
this.graph,
this.machineModel,
- this.powerMux,
+ this.powerDistributor,
this.cpuPowerModel,
) { cause ->
hostState = if (cause != null) HostState.ERROR else HostState.DOWN
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
index 8dcf2fa1..933b4e63 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
@@ -64,8 +64,8 @@ public class HostsProvisioningStep internal constructor(
service.addPowerSource(simPowerSource)
simPowerSources.add(simPowerSource)
- val powerMux = FlowDistributor(graph)
- graph.addEdge(powerMux, simPowerSource)
+ val powerDistributor = FlowDistributor(graph)
+ graph.addEdge(powerDistributor, simPowerSource)
// Create hosts, they are connected to the powerMux when SimMachine is created
for (hostSpec in cluster.hostSpecs) {
@@ -78,7 +78,7 @@ public class HostsProvisioningStep internal constructor(
graph,
hostSpec.model,
hostSpec.cpuPowerModel,
- powerMux,
+ powerDistributor,
)
require(simHosts.add(simHost)) { "Host with uid ${hostSpec.uid} already exists" }
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt
index 8e9a3ad7..845a8bae 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt
@@ -296,8 +296,8 @@ class ExperimentTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(1803918432, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(787181568, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(1803918435, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(787181565, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
{ assertEquals(6.7565629E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
@@ -343,11 +343,11 @@ class ExperimentTest {
{ assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") },
{ assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") },
- { assertEquals(43101787447, monitor.idleTime) { "Incorrect idle time" } },
- { assertEquals(3489412553, monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(43101787496, monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(3489412504, monitor.activeTime) { "Incorrect active time" } },
{ assertEquals(0, monitor.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
- { assertEquals(1.0016123392181786E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
+ { assertEquals(6.914184592181973E9, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
)
}
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt
index 31c7dfd0..2cd464a1 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt
@@ -165,8 +165,6 @@ class FlowDistributorTest {
{ assertEquals(2000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
{ assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
- { assertEquals(0.0, monitor.hostCpuDemands.last()) { "When the task is finished, the host should have 0.0 demand" } },
- { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "When the task is finished, the host should have 0.0 demand" } },
)
}
@@ -202,8 +200,6 @@ class FlowDistributorTest {
{ assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
- { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
- { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}
@@ -239,8 +235,6 @@ class FlowDistributorTest {
{ assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
{ assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
- { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
- { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}
@@ -276,19 +270,52 @@ class FlowDistributorTest {
{ assertEquals(1000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(1000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
- { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
- { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}
/**
- * FlowDistributor test 5: Two task, same time, both fit
+ * FlowDistributor test 5: A single task transition overload to perfect fit
+ * In this test, a single task is scheduled where the first fragment does not fit, and the second does perfectly for the available CPU.
+ * For the first fragment, we expect the usage of the first fragment to be lower than the demand
+ * We check if both the host and the Task show the correct cpu usage and demand during the two fragments.
+ */
+ @Test
+ fun testFlowDistributor5() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 4000.0, 1),
+ TraceFragment(10 * 60 * 1000, 2000.0, 1),
+ ),
+ ),
+ )
+ val topology = createTopology("single_1_2000.json")
+
+ monitor = runTest(topology, workload)
+
+ assertAll(
+ { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } },
+ { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
+ )
+ }
+
+ /**
+ * FlowDistributor test 6: Two task, same time, both fit
* In this test, two tasks are scheduled, and they fit together on the host . The tasks start and finish at the same time
* This test shows how the FlowDistributor handles two tasks that can fit and no redistribution is required.
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
*/
@Test
- fun testFlowDistributor5() {
+ fun testFlowDistributor6() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -325,19 +352,17 @@ class FlowDistributorTest {
{ assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
- { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
- { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}
/**
- * FlowDistributor test 6: Two task, same time, can not fit
+ * FlowDistributor test 7: Two task, same time, can not fit
* In this test, two tasks are scheduled, and they can not both fit. The tasks start and finish at the same time
* This test shows how the FlowDistributor handles two tasks that both do not fit and redistribution is required.
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
*/
@Test
- fun testFlowDistributor6() {
+ fun testFlowDistributor7() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -374,18 +399,16 @@ class FlowDistributorTest {
{ assertEquals(11000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
- { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
- { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}
/**
- * FlowDistributor test 7: Two task, both fit, second task is delayed
+ * FlowDistributor test 8: Two task, both fit, second task is delayed
* In this test, two tasks are scheduled, the second task is delayed.
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
*/
@Test
- fun testFlowDistributor7() {
+ fun testFlowDistributor8() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -431,13 +454,11 @@ class FlowDistributorTest {
{ assertEquals(3000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } },
- { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
- { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}
/**
- * FlowDistributor test 8: Two task, both fit on their own but not together, second task is delayed
+ * FlowDistributor test 9: Two task, both fit on their own but not together, second task is delayed
* In this test, two tasks are scheduled, the second task is delayed.
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
* When the second task comes in, the host is overloaded.
@@ -445,7 +466,7 @@ class FlowDistributorTest {
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
*/
@Test
- fun testFlowDistributor8() {
+ fun testFlowDistributor9() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -488,20 +509,18 @@ class FlowDistributorTest {
{ assertEquals(3000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } },
{ assertEquals(3000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } },
- { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
- { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}
/**
- * FlowDistributor test 9: Two task, one changes demand, causing overload
+ * FlowDistributor test 10: Two task, one changes demand, causing overload
* In this test, two tasks are scheduled, and they can both fit.
* However, task 0 increases its demand which overloads the FlowDistributor.
* This test shows how the FlowDistributor handles transition from fitting to overloading when multiple tasks are running.
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
*/
@Test
- fun testFlowDistributor9() {
+ fun testFlowDistributor10() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -551,12 +570,84 @@ class FlowDistributorTest {
{ assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } },
- { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
- { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}
/**
+ * FlowDistributor test 11: 5000 hosts. This tests the performance of the distributor
+ * In this test, two tasks are scheduled, and they can both fit.
+ * However, task 0 increases its demand which overloads the FlowDistributor.
+ * This test shows how the FlowDistributor handles transition from fitting to overloading when multiple tasks are running.
+ * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
+ */
+ @Test
+ fun testFlowDistributor11() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf<TraceFragment>().apply {
+ repeat(10) { this.add(TraceFragment(20 * 60 * 1000, 3000.0, 1)) }
+ },
+ ),
+ )
+ val topology = createTopology("single_5000_2000.json")
+
+ monitor = runTest(topology, workload)
+ }
+
+ /**
+ * FlowDistributor test 12: 1000 fragments. This tests the performance of the distributor
+ * In this test, two tasks are scheduled, and they can both fit.
+ * However, task 0 increases its demand which overloads the FlowDistributor.
+ * This test shows how the FlowDistributor handles transition from fitting to overloading when multiple tasks are running.
+ * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
+ */
+ @Test
+ fun testFlowDistributor12() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf<TraceFragment>().apply {
+ repeat(1000) { this.add(TraceFragment(10 * 60 * 1000, 2000.0, 1)) }
+ },
+ ),
+ )
+ val topology = createTopology("single_1_2000.json")
+
+ monitor = runTest(topology, workload)
+ }
+
+ /**
+ * FlowDistributor test 13: 1000 tasks. This tests the performance
+ * In this test, two tasks are scheduled, and they can both fit.
+ * However, task 0 increases its demand which overloads the FlowDistributor.
+ * This test shows how the FlowDistributor handles transition from fitting to overloading when multiple tasks are running.
+ * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
+ */
+ @Test
+ fun testFlowDistributor13() {
+ val workload: ArrayList<Task> =
+ arrayListOf<Task>().apply {
+ repeat(1000) {
+ this.add(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(TraceFragment(10 * 60 * 1000, 2000.0, 1)),
+ ),
+ )
+ }
+ }
+ val topology = createTopology("single_1_2000.json")
+
+ monitor = runTest(topology, workload)
+ }
+
+ /**
* Obtain the topology factory for the test.
*/
private fun createTopology(name: String): List<ClusterSpec> {
diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_5000_2000.json b/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_5000_2000.json
new file mode 100644
index 00000000..9f1e418f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_5000_2000.json
@@ -0,0 +1,23 @@
+{
+ "clusters":
+ [
+ {
+ "name": "C01",
+ "hosts" :
+ [
+ {
+ "name": "H01",
+ "cpu":
+ {
+ "coreCount": 1,
+ "coreSpeed": 2000
+ },
+ "memory": {
+ "memorySize": 140457600000
+ },
+ "count": 5000
+ }
+ ]
+ }
+ ]
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java
index c5b8a9ea..a9edaa97 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java
@@ -51,7 +51,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
private long lastCounterUpdate;
private final double cpuFrequencyInv;
- private FlowEdge muxEdge;
+ private FlowEdge distributorEdge;
private FlowEdge psuEdge;
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -123,21 +123,16 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
public long onUpdate(long now) {
updateCounters(now);
- this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0);
-
- // Calculate Power Demand and send to PSU
- double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization);
+ // Check if supply == demand
+ if (this.currentPowerDemand != this.currentPowerSupplied) {
+ this.pushOutgoingDemand(this.psuEdge, this.currentPowerDemand);
- if (powerDemand != this.currentPowerDemand) {
- this.pushDemand(this.psuEdge, powerDemand);
+ return Long.MAX_VALUE;
}
- // Calculate the amount of cpu this can provide
- double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity);
+ this.currentCpuSupplied = Math.min(this.currentCpuDemand, this.maxCapacity);
- if (cpuSupply != this.currentCpuSupplied) {
- this.pushSupply(this.muxEdge, cpuSupply);
- }
+ this.pushOutgoingSupply(this.distributorEdge, this.currentCpuSupplied);
return Long.MAX_VALUE;
}
@@ -181,7 +176,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
* Push new demand to the psu
*/
@Override
- public void pushDemand(FlowEdge supplierEdge, double newPowerDemand) {
+ public void pushOutgoingDemand(FlowEdge supplierEdge, double newPowerDemand) {
updateCounters();
this.currentPowerDemand = newPowerDemand;
this.psuEdge.pushDemand(newPowerDemand);
@@ -191,47 +186,38 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
* Push updated supply to the mux
*/
@Override
- public void pushSupply(FlowEdge consumerEdge, double newCpuSupply) {
+ public void pushOutgoingSupply(FlowEdge consumerEdge, double newCpuSupply) {
updateCounters();
this.currentCpuSupplied = newCpuSupply;
- this.muxEdge.pushSupply(newCpuSupply);
+
+ this.distributorEdge.pushSupply(newCpuSupply, true);
}
/**
* Handle new demand coming in from the mux
*/
@Override
- public void handleDemand(FlowEdge consumerEdge, double newCpuDemand) {
+ public void handleIncomingDemand(FlowEdge consumerEdge, double newCpuDemand) {
updateCounters();
this.currentCpuDemand = newCpuDemand;
- this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity;
this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0);
// Calculate Power Demand and send to PSU
- double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization);
+ this.currentPowerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization);
- if (powerDemand != this.currentPowerDemand) {
- this.pushDemand(this.psuEdge, powerDemand);
- }
+ this.invalidate();
}
/**
* Handle updated supply from the psu
*/
@Override
- public void handleSupply(FlowEdge supplierEdge, double newPowerSupply) {
- // TODO: Implement this
+ public void handleIncomingSupply(FlowEdge supplierEdge, double newPowerSupply) {
updateCounters();
this.currentPowerSupplied = newPowerSupply;
- // Calculate the amount of cpu this can provide
- double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity);
- ;
-
- if (cpuSupply != this.currentCpuSupplied) {
- this.pushSupply(this.muxEdge, cpuSupply);
- }
+ this.invalidate();
}
/**
@@ -239,7 +225,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
*/
@Override
public void addConsumerEdge(FlowEdge consumerEdge) {
- this.muxEdge = consumerEdge;
+ this.distributorEdge = consumerEdge;
}
/**
@@ -257,7 +243,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
*/
@Override
public void removeConsumerEdge(FlowEdge consumerEdge) {
- this.muxEdge = null;
+ this.distributorEdge = null;
this.invalidate();
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java
index 074f0ed8..dab0c421 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java
@@ -44,7 +44,7 @@ public class SimMachine {
private final InstantSource clock;
private SimCpu cpu;
- private FlowDistributor cpuMux;
+ private FlowDistributor cpuDistributor;
private SimPsu psu;
private Memory memory;
@@ -74,8 +74,8 @@ public class SimMachine {
return cpu;
}
- public FlowDistributor getCpuMux() {
- return cpuMux;
+ public FlowDistributor getCpuDistributor() {
+ return cpuDistributor;
}
public Memory getMemory() {
@@ -114,7 +114,7 @@ public class SimMachine {
public SimMachine(
FlowGraph graph,
MachineModel machineModel,
- FlowDistributor powerMux,
+ FlowDistributor powerDistributor,
CpuPowerModel cpuPowerModel,
Consumer<Exception> completion) {
this.graph = graph;
@@ -124,7 +124,7 @@ public class SimMachine {
// Create the psu and cpu and connect them
this.psu = new SimPsu(graph);
- graph.addEdge(this.psu, powerMux);
+ graph.addEdge(this.psu, powerDistributor);
this.cpu = new SimCpu(graph, this.machineModel.getCpuModel(), cpuPowerModel, 0);
@@ -133,8 +133,8 @@ public class SimMachine {
this.memory = new Memory(graph, this.machineModel.getMemory());
// Create a FlowDistributor and add the cpu as supplier
- this.cpuMux = new FlowDistributor(this.graph);
- graph.addEdge(this.cpuMux, this.cpu);
+ this.cpuDistributor = new FlowDistributor(this.graph);
+ graph.addEdge(this.cpuDistributor, this.cpu);
this.completion = completion;
}
@@ -153,8 +153,8 @@ public class SimMachine {
this.graph.removeNode(this.cpu);
this.cpu = null;
- this.graph.removeNode(this.cpuMux);
- this.cpuMux = null;
+ this.graph.removeNode(this.cpuDistributor);
+ this.cpuDistributor = null;
this.memory = null;
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java
index b8a9c738..1946eecb 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java
@@ -100,7 +100,7 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli
this.clock = this.machine.getClock();
this.parentGraph = machine.getGraph();
- this.parentGraph.addEdge(this, this.machine.getCpuMux());
+ this.parentGraph.addEdge(this, this.machine.getCpuDistributor());
this.lastUpdate = clock.millis();
this.lastUpdate = clock.millis();
@@ -185,7 +185,7 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli
* Push demand to the cpuMux if the demand has changed
**/
@Override
- public void pushDemand(FlowEdge supplierEdge, double newDemand) {
+ public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
this.cpuEdge.pushDemand(newDemand);
}
@@ -193,7 +193,7 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli
* Push supply to the workload if the supply has changed
**/
@Override
- public void pushSupply(FlowEdge consumerEdge, double newSupply) {
+ public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) {
this.workloadEdge.pushSupply(newSupply);
}
@@ -201,24 +201,24 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli
* Handle new demand from the workload by sending it through to the cpuMux
**/
@Override
- public void handleDemand(FlowEdge consumerEdge, double newDemand) {
+ public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) {
updateCounters(this.clock.millis());
this.cpuDemand = newDemand;
- pushDemand(this.cpuEdge, newDemand);
+ pushOutgoingDemand(this.cpuEdge, newDemand);
}
/**
* Handle a new supply pushed by the cpuMux by sending it through to the workload
**/
@Override
- public void handleSupply(FlowEdge supplierEdge, double newCpuSupply) {
+ public void handleIncomingSupply(FlowEdge supplierEdge, double newCpuSupply) {
updateCounters(this.clock.millis());
this.cpuSupply = newCpuSupply;
- pushSupply(this.workloadEdge, newCpuSupply);
+ pushOutgoingSupply(this.workloadEdge, newCpuSupply);
}
@Override
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java
index e8626e40..d2270888 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java
@@ -43,7 +43,7 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier {
private double totalCarbonEmission = 0.0f;
private CarbonModel carbonModel = null;
- private FlowEdge muxEdge;
+ private FlowEdge distributorEdge;
private double capacity = Long.MAX_VALUE;
@@ -57,7 +57,7 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier {
* @return <code>true</code> if the InPort is connected to an OutPort, <code>false</code> otherwise.
*/
public boolean isConnected() {
- return muxEdge != null;
+ return distributorEdge != null;
}
/**
@@ -156,30 +156,30 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@Override
- public void handleDemand(FlowEdge consumerEdge, double newPowerDemand) {
+ public void handleIncomingDemand(FlowEdge consumerEdge, double newPowerDemand) {
this.powerDemand = newPowerDemand;
double powerSupply = this.powerDemand;
if (powerSupply != this.powerSupplied) {
- this.pushSupply(this.muxEdge, powerSupply);
+ this.pushOutgoingSupply(this.distributorEdge, powerSupply);
}
}
@Override
- public void pushSupply(FlowEdge consumerEdge, double newSupply) {
+ public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) {
this.powerSupplied = newSupply;
consumerEdge.pushSupply(newSupply);
}
@Override
public void addConsumerEdge(FlowEdge consumerEdge) {
- this.muxEdge = consumerEdge;
+ this.distributorEdge = consumerEdge;
}
@Override
public void removeConsumerEdge(FlowEdge consumerEdge) {
- this.muxEdge = null;
+ this.distributorEdge = null;
}
// Update the carbon intensity of the power source
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java
index c1e8a1b9..dc5129d6 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java
@@ -106,7 +106,7 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer
double powerSupply = this.powerDemand;
if (powerSupply != this.powerSupplied) {
- this.pushSupply(this.cpuEdge, powerSupply);
+ this.pushOutgoingSupply(this.cpuEdge, powerSupply);
}
return Long.MAX_VALUE;
@@ -135,33 +135,33 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@Override
- public void pushDemand(FlowEdge supplierEdge, double newDemand) {
+ public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
this.powerDemand = newDemand;
powerSupplyEdge.pushDemand(newDemand);
}
@Override
- public void pushSupply(FlowEdge consumerEdge, double newSupply) {
+ public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) {
this.powerSupplied = newSupply;
cpuEdge.pushSupply(newSupply);
}
@Override
- public void handleDemand(FlowEdge consumerEdge, double newPowerDemand) {
+ public void handleIncomingDemand(FlowEdge consumerEdge, double newPowerDemand) {
updateCounters();
this.powerDemand = newPowerDemand;
- pushDemand(this.powerSupplyEdge, newPowerDemand);
+ pushOutgoingDemand(this.powerSupplyEdge, newPowerDemand);
}
@Override
- public void handleSupply(FlowEdge supplierEdge, double newPowerSupply) {
+ public void handleIncomingSupply(FlowEdge supplierEdge, double newPowerSupply) {
updateCounters();
this.powerSupplied = newPowerSupply;
- pushSupply(this.cpuEdge, newPowerSupply);
+ pushOutgoingSupply(this.cpuEdge, newPowerSupply);
}
@Override
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java
index b612de2c..da6b8334 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java
@@ -195,7 +195,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier {
* @param newDemand new demand to sent to the cpu
*/
@Override
- public void pushDemand(FlowEdge supplierEdge, double newDemand) {
+ public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
this.demand = newDemand;
this.machineEdge.pushDemand(newDemand);
@@ -208,7 +208,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier {
* @param newSupply new supply to sent to the workload
*/
@Override
- public void pushSupply(FlowEdge consumerEdge, double newSupply) {
+ public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) {
this.supply = newSupply;
this.workloadEdge.pushSupply(newSupply);
@@ -221,8 +221,8 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier {
* @param newDemand new demand coming from the workload
*/
@Override
- public void handleDemand(FlowEdge consumerEdge, double newDemand) {
- this.pushDemand(this.machineEdge, newDemand);
+ public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) {
+ this.pushOutgoingDemand(this.machineEdge, newDemand);
}
/**
@@ -232,8 +232,8 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier {
* @param newSupply The new supply that is sent to the workload
*/
@Override
- public void handleSupply(FlowEdge supplierEdge, double newSupply) {
- this.pushSupply(this.machineEdge, newSupply);
+ public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) {
+ this.pushOutgoingSupply(this.machineEdge, newSupply);
}
/**
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java
index 8487fbc2..0735d8ae 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java
@@ -94,7 +94,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
graph.addEdge(this, supplier);
this.currentFragment = this.getNextFragment();
- pushDemand(machineEdge, this.currentFragment.cpuUsage());
+ pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage());
this.startOfFragment = now;
}
@@ -131,7 +131,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.startOfFragment = now - passedTime;
// Change the cpu Usage to the new Fragment
- pushDemand(machineEdge, this.currentFragment.cpuUsage());
+ pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage());
// Return the time when the current fragment will complete
return this.startOfFragment + duration;
@@ -190,7 +190,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.fragmentIndex = -1;
this.currentFragment = getNextFragment();
- pushDemand(this.machineEdge, this.currentFragment.cpuUsage());
+ pushOutgoingDemand(this.machineEdge, this.currentFragment.cpuUsage());
this.startOfFragment = now;
this.invalidate();
@@ -207,7 +207,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
* @param newSupply The new demand that needs to be sent to the VM
*/
@Override
- public void handleSupply(FlowEdge supplierEdge, double newSupply) {
+ public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) {
if (newSupply == this.currentSupply) {
return;
}
@@ -222,7 +222,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
* @param newDemand The new demand that needs to be sent to the VM
*/
@Override
- public void pushDemand(FlowEdge supplierEdge, double newDemand) {
+ public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
if (newDemand == this.currentDemand) {
return;
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
index 24476048..67540f4e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
@@ -44,7 +44,7 @@ public final class FlowEngine implements Runnable {
/**
* A priority queue containing the {@link FlowNode} updates to be scheduled in the future.
*/
- private final FlowTimerQueue timerQueue = new FlowTimerQueue(256);
+ private final FlowEventQueue eventQueue = new FlowEventQueue(256);
/**
* The stack of engine invocations to occur in the future.
@@ -127,7 +127,7 @@ public final class FlowEngine implements Runnable {
return;
}
- long deadline = timerQueue.peekDeadline();
+ long deadline = eventQueue.peekDeadline();
if (deadline != Long.MAX_VALUE) {
trySchedule(futureInvocations, clock.millis(), deadline);
}
@@ -139,7 +139,7 @@ public final class FlowEngine implements Runnable {
* This method should only be invoked while inside an engine cycle.
*/
public void scheduleDelayedInContext(FlowNode ctx) {
- FlowTimerQueue timerQueue = this.timerQueue;
+ FlowEventQueue timerQueue = this.eventQueue;
timerQueue.enqueue(ctx);
}
@@ -148,7 +148,7 @@ public final class FlowEngine implements Runnable {
*/
private void doRunEngine(long now) {
final FlowCycleQueue queue = this.cycleQueue;
- final FlowTimerQueue timerQueue = this.timerQueue;
+ final FlowEventQueue timerQueue = this.eventQueue;
try {
// Mark the engine as active to prevent concurrent calls to this method
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java
index 049eb40d..53649d29 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java
@@ -26,12 +26,12 @@ import java.util.Arrays;
import org.opendc.simulator.engine.graph.FlowNode;
/**
- * A specialized priority queue for timers of {@link FlowNode}s.
+ * A specialized priority queue for future event of {@link FlowNode}s sorted on time.
* <p>
* By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation
* being generic.
*/
-public final class FlowTimerQueue {
+public final class FlowEventQueue {
/**
* Array representation of binary heap of {@link FlowNode} instances.
*/
@@ -43,11 +43,11 @@ public final class FlowTimerQueue {
private int size = 0;
/**
- * Construct a {@link FlowTimerQueue} with the specified initial capacity.
+ * Construct a {@link FlowEventQueue} with the specified initial capacity.
*
* @param initialCapacity The initial capacity of the queue.
*/
- public FlowTimerQueue(int initialCapacity) {
+ public FlowEventQueue(int initialCapacity) {
this.queue = new FlowNode[initialCapacity];
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java
index 2130d376..a9da6f5d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java
@@ -24,9 +24,9 @@ package org.opendc.simulator.engine.graph;
public interface FlowConsumer {
- void handleSupply(FlowEdge supplierEdge, double newSupply);
+ void handleIncomingSupply(FlowEdge supplierEdge, double newSupply);
- void pushDemand(FlowEdge supplierEdge, double newDemand);
+ void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand);
void addSupplierEdge(FlowEdge supplierEdge);
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
index 7ef091f8..16bb161f 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
@@ -29,27 +29,30 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
private FlowEdge supplierEdge;
- private final ArrayList<Double> demands = new ArrayList<>(); // What is demanded by the consumers
- private final ArrayList<Double> supplies = new ArrayList<>(); // What is supplied to the consumers
+ private final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers
+ private final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers
- private double totalDemand; // The total demand of all the consumers
- private double totalSupply; // The total supply from the supplier
+ private double totalIncomingDemand; // The total demand of all the consumers
+ private double currentIncomingSupply; // The current supply provided by the supplier
- private boolean overLoaded = false;
- private int currentConsumerIdx = -1;
+ private boolean outgoingDemandUpdateNeeded = false;
- private double capacity; // What is the max capacity
+ private boolean overloaded = false;
+
+ private double capacity; // What is the max capacity. Can probably be removed
+
+ private final ArrayList<Integer> updatedDemands = new ArrayList<>();
public FlowDistributor(FlowGraph graph) {
super(graph);
}
- public double getTotalDemand() {
- return totalDemand;
+ public double getTotalIncomingDemand() {
+ return totalIncomingDemand;
}
- public double getTotalSupply() {
- return totalSupply;
+ public double getCurrentIncomingSupply() {
+ return currentIncomingSupply;
}
public double getCapacity() {
@@ -58,38 +61,61 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
public long onUpdate(long now) {
+ // Check if current supply is different from total demand
+ if (this.outgoingDemandUpdateNeeded) {
+ this.updateOutgoingDemand();
+
+ return Long.MAX_VALUE;
+ }
+
+ this.updateOutgoingSupplies();
+
return Long.MAX_VALUE;
}
- private void distributeSupply() {
- // if supply >= demand -> push supplies to all tasks
- if (this.totalSupply > this.totalDemand) {
+ private void updateOutgoingDemand() {
+ this.pushOutgoingDemand(this.supplierEdge, this.totalIncomingDemand);
- // If this came from a state of overload, provide all consumers with their demand
- if (this.overLoaded) {
- for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx));
- }
- }
+ this.outgoingDemandUpdateNeeded = false;
- if (this.currentConsumerIdx != -1) {
- this.pushSupply(
- this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx));
- this.currentConsumerIdx = -1;
- }
+ this.invalidate();
+ }
- this.overLoaded = false;
- }
+ private void updateOutgoingSupplies() {
+
+ // If the demand is higher than the current supply, the system is overloaded.
+ // The available supply is distributed based on the current distribution function.
+ if (this.totalIncomingDemand > this.currentIncomingSupply) {
+ this.overloaded = true;
- // if supply < demand -> distribute the supply over all consumers
- else {
- this.overLoaded = true;
- double[] supplies = redistributeSupply(this.demands, this.totalSupply);
+ double[] supplies = distributeSupply(this.incomingDemands, this.currentIncomingSupply);
for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- this.pushSupply(this.consumerEdges.get(idx), supplies[idx]);
+ this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx]);
+ }
+
+ } else {
+
+ // If the distributor was overloaded before, but is not anymore:
+ // provide all consumers with their demand
+ if (this.overloaded) {
+ for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
+ if (this.outgoingSupplies.get(idx) != this.incomingDemands.get(idx)) {
+ this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx));
+ }
+ }
+ this.overloaded = false;
+ }
+
+ // Update the supplies of the consumers that changed their demand in the current cycle
+ else {
+ for (int idx : this.updatedDemands) {
+ this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx));
+ }
}
}
+
+ this.updatedDemands.clear();
}
private record Demand(int idx, double value) {}
@@ -97,10 +123,8 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
/**
* Distributed the available supply over the different demands.
* The supply is distributed using MaxMin Fairness.
- *
- * TODO: Move this outside of the Distributor so we can easily add different redistribution methods
*/
- private static double[] redistributeSupply(ArrayList<Double> demands, double totalSupply) {
+ private static double[] distributeSupply(ArrayList<Double> demands, double currentSupply) {
int inputSize = demands.size();
final double[] supplies = new double[inputSize];
@@ -116,7 +140,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
return i1.compareTo(i2);
});
- double availableCapacity = totalSupply; // totalSupply
+ double availableCapacity = currentSupply; // totalSupply
for (int i = 0; i < inputSize; i++) {
double d = tempDemands[i].value;
@@ -133,7 +157,6 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
availableCapacity -= r;
}
- // Return the used capacity
return supplies;
}
@@ -146,15 +169,15 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
consumerEdge.setConsumerIndex(this.consumerEdges.size());
this.consumerEdges.add(consumerEdge);
- this.demands.add(0.0);
- this.supplies.add(0.0);
+ this.incomingDemands.add(0.0);
+ this.outgoingSupplies.add(0.0);
}
@Override
public void addSupplierEdge(FlowEdge supplierEdge) {
this.supplierEdge = supplierEdge;
this.capacity = supplierEdge.getCapacity();
- this.totalSupply = 0;
+ this.currentIncomingSupply = 0;
}
@Override
@@ -165,84 +188,87 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
return;
}
- this.totalDemand -= consumerEdge.getDemand();
+ this.totalIncomingDemand -= consumerEdge.getDemand();
this.consumerEdges.remove(idx);
- this.demands.remove(idx);
- this.supplies.remove(idx);
+ this.incomingDemands.remove(idx);
+ this.outgoingSupplies.remove(idx);
// update the consumer index for all consumerEdges higher than this.
for (int i = idx; i < this.consumerEdges.size(); i++) {
this.consumerEdges.get(i).setConsumerIndex(i);
}
- this.currentConsumerIdx = -1;
+ for (int i = 0; i < this.updatedDemands.size(); i++) {
+ int j = this.updatedDemands.get(i);
- if (this.overLoaded) {
- this.distributeSupply();
+ if (j == idx) {
+ this.updatedDemands.remove(idx);
+ }
+ if (j > idx) {
+ this.updatedDemands.set(i, j - 1);
+ }
}
- this.pushDemand(this.supplierEdge, this.totalDemand);
+ this.outgoingDemandUpdateNeeded = true;
+ this.invalidate();
}
@Override
public void removeSupplierEdge(FlowEdge supplierEdge) {
this.supplierEdge = null;
this.capacity = 0;
- this.totalSupply = 0;
+ this.currentIncomingSupply = 0;
+
+ this.invalidate();
}
@Override
- public void handleDemand(FlowEdge consumerEdge, double newDemand) {
+ public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) {
int idx = consumerEdge.getConsumerIndex();
- this.currentConsumerIdx = idx;
-
if (idx == -1) {
System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer");
return;
}
// Update the total demand (This is cheaper than summing over all demands)
- double prevDemand = demands.get(idx);
+ double prevDemand = incomingDemands.get(idx);
- demands.set(idx, newDemand);
- this.totalDemand += (newDemand - prevDemand);
+ incomingDemands.set(idx, newDemand);
+ this.totalIncomingDemand += (newDemand - prevDemand);
- if (overLoaded) {
- distributeSupply();
- }
+ this.updatedDemands.add(idx);
- // Send new totalDemand to CPU
- // TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply)
- this.pushDemand(this.supplierEdge, this.totalDemand);
+ this.outgoingDemandUpdateNeeded = true;
+ this.invalidate();
}
@Override
- public void handleSupply(FlowEdge supplierEdge, double newSupply) {
- this.totalSupply = newSupply;
+ public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) {
+ this.currentIncomingSupply = newSupply;
- this.distributeSupply();
+ this.invalidate();
}
@Override
- public void pushDemand(FlowEdge supplierEdge, double newDemand) {
+ public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
this.supplierEdge.pushDemand(newDemand);
}
@Override
- public void pushSupply(FlowEdge consumerEdge, double newSupply) {
+ public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) {
int idx = consumerEdge.getConsumerIndex();
if (idx == -1) {
System.out.println("Error (FlowDistributor): pushing supply to an unknown consumer");
}
- if (supplies.get(idx) == newSupply) {
+ if (outgoingSupplies.get(idx) == newSupply) {
return;
}
- supplies.set(idx, newSupply);
+ outgoingSupplies.set(idx, newSupply);
consumerEdge.pushSupply(newSupply);
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java
index b7162508..9521f2ce 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java
@@ -108,24 +108,38 @@ public class FlowEdge {
/**
* Push new demand from the Consumer to the Supplier
*/
- public void pushDemand(double newDemand) {
- if (newDemand == this.demand) {
+ public void pushDemand(double newDemand, boolean forceThrough) {
+ if ((newDemand == this.demand) && !forceThrough) {
return;
}
this.demand = newDemand;
- this.supplier.handleDemand(this, newDemand);
+ this.supplier.handleIncomingDemand(this, newDemand);
+ }
+
+ /**
+ * Push new demand from the Consumer to the Supplier
+ */
+ public void pushDemand(double newDemand) {
+ this.pushDemand(newDemand, false);
}
/**
* Push new supply from the Supplier to the Consumer
*/
- public void pushSupply(double newSupply) {
- if (newSupply == this.supply) {
+ public void pushSupply(double newSupply, boolean forceThrough) {
+ if ((newSupply == this.supply) && !forceThrough) {
return;
}
this.supply = newSupply;
- this.consumer.handleSupply(this, newSupply);
+ this.consumer.handleIncomingSupply(this, newSupply);
+ }
+
+ /**
+ * Push new supply from the Supplier to the Consumer
+ */
+ public void pushSupply(double newSupply) {
+ this.pushSupply(newSupply, false);
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java
index 64cd0d8c..e24e9f93 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java
@@ -24,7 +24,7 @@ package org.opendc.simulator.engine.graph;
import java.time.InstantSource;
import org.opendc.simulator.engine.engine.FlowEngine;
-import org.opendc.simulator.engine.engine.FlowTimerQueue;
+import org.opendc.simulator.engine.engine.FlowEventQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,7 +109,7 @@ public abstract class FlowNode {
private long deadline = Long.MAX_VALUE;
/**
- * The index of the timer in the {@link FlowTimerQueue}.
+ * The index of the timer in the {@link FlowEventQueue}.
*/
private int timerIndex = -1;
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java
index 84602ee0..da65392b 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java
@@ -24,9 +24,9 @@ package org.opendc.simulator.engine.graph;
public interface FlowSupplier {
- void handleDemand(FlowEdge consumerEdge, double newDemand);
+ void handleIncomingDemand(FlowEdge consumerEdge, double newDemand);
- void pushSupply(FlowEdge consumerEdge, double newSupply);
+ void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply);
void addConsumerEdge(FlowEdge consumerEdge);