diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2025-01-07 11:25:48 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-01-07 11:25:48 +0100 |
| commit | f71e07f55a5176c5bd5447cdb3bcfebf2f5f47ee (patch) | |
| tree | 9a7e5b11887c560668a17fc2f130bfed7aaceda5 | |
| parent | c425a03c59e7d5c2e5d82988c61e340a6cbf61fe (diff) | |
Updated the FlowDistributor (#285)
* Updated the FlowDistributor to work in more cases and be more performant.
* Removed old FlowDistributor
19 files changed, 338 insertions, 198 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt index f34e135d..cc4ac2d8 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt @@ -62,7 +62,7 @@ public class SimHost( private val graph: FlowGraph, private val machineModel: MachineModel, private val cpuPowerModel: CpuPowerModel, - private val powerMux: FlowDistributor, + private val powerDistributor: FlowDistributor, ) : AutoCloseable { /** * The event listeners registered with this host. @@ -131,7 +131,7 @@ public class SimHost( SimMachine( this.graph, this.machineModel, - this.powerMux, + this.powerDistributor, this.cpuPowerModel, ) { cause -> hostState = if (cause != null) HostState.ERROR else HostState.DOWN diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt index 8dcf2fa1..933b4e63 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt @@ -64,8 +64,8 @@ public class HostsProvisioningStep internal constructor( service.addPowerSource(simPowerSource) simPowerSources.add(simPowerSource) - val powerMux = FlowDistributor(graph) - graph.addEdge(powerMux, simPowerSource) + val powerDistributor = FlowDistributor(graph) + graph.addEdge(powerDistributor, simPowerSource) // Create hosts, they are connected to the powerMux when SimMachine is created for (hostSpec in cluster.hostSpecs) { @@ -78,7 +78,7 @@ public class HostsProvisioningStep internal constructor( graph, hostSpec.model, hostSpec.cpuPowerModel, - powerMux, + powerDistributor, ) require(simHosts.add(simHost)) { "Host with uid ${hostSpec.uid} already exists" } diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt index 8e9a3ad7..845a8bae 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt @@ -296,8 +296,8 @@ class ExperimentTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(1803918432, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(787181568, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(1803918435, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(787181565, monitor.activeTime) { "Active time incorrect" } }, { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, { assertEquals(6.7565629E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, @@ -343,11 +343,11 @@ class ExperimentTest { { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") }, { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") }, - { assertEquals(43101787447, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(3489412553, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(43101787496, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(3489412504, monitor.activeTime) { "Incorrect active time" } }, { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } }, { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(1.0016123392181786E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + { assertEquals(6.914184592181973E9, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, ) } diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt index 31c7dfd0..2cd464a1 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt @@ -165,8 +165,6 @@ class FlowDistributorTest { { assertEquals(2000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "When the task is finished, the host should have 0.0 demand" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "When the task is finished, the host should have 0.0 demand" } }, ) } @@ -202,8 +200,6 @@ class FlowDistributorTest { { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, ) } @@ -239,8 +235,6 @@ class FlowDistributorTest { { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, ) } @@ -276,19 +270,52 @@ class FlowDistributorTest { { assertEquals(1000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, { assertEquals(1000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, ) } /** - * FlowDistributor test 5: Two task, same time, both fit + * FlowDistributor test 5: A single task transition overload to perfect fit + * In this test, a single task is scheduled where the first fragment does not fit, and the second does perfectly for the available CPU. + * For the first fragment, we expect the usage of the first fragment to be lower than the demand + * We check if both the host and the Task show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testFlowDistributor5() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 4000.0, 1), + TraceFragment(10 * 60 * 1000, 2000.0, 1), + ), + ), + ) + val topology = createTopology("single_1_2000.json") + + monitor = runTest(topology, workload) + + assertAll( + { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + ) + } + + /** + * FlowDistributor test 6: Two task, same time, both fit * In this test, two tasks are scheduled, and they fit together on the host . The tasks start and finish at the same time * This test shows how the FlowDistributor handles two tasks that can fit and no redistribution is required. * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. */ @Test - fun testFlowDistributor5() { + fun testFlowDistributor6() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -325,19 +352,17 @@ class FlowDistributorTest { { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, ) } /** - * FlowDistributor test 6: Two task, same time, can not fit + * FlowDistributor test 7: Two task, same time, can not fit * In this test, two tasks are scheduled, and they can not both fit. The tasks start and finish at the same time * This test shows how the FlowDistributor handles two tasks that both do not fit and redistribution is required. * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. */ @Test - fun testFlowDistributor6() { + fun testFlowDistributor7() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -374,18 +399,16 @@ class FlowDistributorTest { { assertEquals(11000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, ) } /** - * FlowDistributor test 7: Two task, both fit, second task is delayed + * FlowDistributor test 8: Two task, both fit, second task is delayed * In this test, two tasks are scheduled, the second task is delayed. * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. */ @Test - fun testFlowDistributor7() { + fun testFlowDistributor8() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -431,13 +454,11 @@ class FlowDistributorTest { { assertEquals(3000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } }, { assertEquals(2000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, ) } /** - * FlowDistributor test 8: Two task, both fit on their own but not together, second task is delayed + * FlowDistributor test 9: Two task, both fit on their own but not together, second task is delayed * In this test, two tasks are scheduled, the second task is delayed. * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. * When the second task comes in, the host is overloaded. @@ -445,7 +466,7 @@ class FlowDistributorTest { * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. */ @Test - fun testFlowDistributor8() { + fun testFlowDistributor9() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -488,20 +509,18 @@ class FlowDistributorTest { { assertEquals(3000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, { assertEquals(3000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, ) } /** - * FlowDistributor test 9: Two task, one changes demand, causing overload + * FlowDistributor test 10: Two task, one changes demand, causing overload * In this test, two tasks are scheduled, and they can both fit. * However, task 0 increases its demand which overloads the FlowDistributor. * This test shows how the FlowDistributor handles transition from fitting to overloading when multiple tasks are running. * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. */ @Test - fun testFlowDistributor9() { + fun testFlowDistributor10() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -551,12 +570,84 @@ class FlowDistributorTest { { assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } }, { assertEquals(4000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, - { assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } }, - { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } }, ) } /** + * FlowDistributor test 11: 5000 hosts. This tests the performance of the distributor + * In this test, two tasks are scheduled, and they can both fit. + * However, task 0 increases its demand which overloads the FlowDistributor. + * This test shows how the FlowDistributor handles transition from fitting to overloading when multiple tasks are running. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testFlowDistributor11() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf<TraceFragment>().apply { + repeat(10) { this.add(TraceFragment(20 * 60 * 1000, 3000.0, 1)) } + }, + ), + ) + val topology = createTopology("single_5000_2000.json") + + monitor = runTest(topology, workload) + } + + /** + * FlowDistributor test 12: 1000 fragments. This tests the performance of the distributor + * In this test, two tasks are scheduled, and they can both fit. + * However, task 0 increases its demand which overloads the FlowDistributor. + * This test shows how the FlowDistributor handles transition from fitting to overloading when multiple tasks are running. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testFlowDistributor12() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf<TraceFragment>().apply { + repeat(1000) { this.add(TraceFragment(10 * 60 * 1000, 2000.0, 1)) } + }, + ), + ) + val topology = createTopology("single_1_2000.json") + + monitor = runTest(topology, workload) + } + + /** + * FlowDistributor test 13: 1000 tasks. This tests the performance + * In this test, two tasks are scheduled, and they can both fit. + * However, task 0 increases its demand which overloads the FlowDistributor. + * This test shows how the FlowDistributor handles transition from fitting to overloading when multiple tasks are running. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testFlowDistributor13() { + val workload: ArrayList<Task> = + arrayListOf<Task>().apply { + repeat(1000) { + this.add( + createTestTask( + name = "0", + fragments = + arrayListOf(TraceFragment(10 * 60 * 1000, 2000.0, 1)), + ), + ) + } + } + val topology = createTopology("single_1_2000.json") + + monitor = runTest(topology, workload) + } + + /** * Obtain the topology factory for the test. */ private fun createTopology(name: String): List<ClusterSpec> { diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_5000_2000.json b/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_5000_2000.json new file mode 100644 index 00000000..9f1e418f --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_5000_2000.json @@ -0,0 +1,23 @@ +{ + "clusters": + [ + { + "name": "C01", + "hosts" : + [ + { + "name": "H01", + "cpu": + { + "coreCount": 1, + "coreSpeed": 2000 + }, + "memory": { + "memorySize": 140457600000 + }, + "count": 5000 + } + ] + } + ] +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java index c5b8a9ea..a9edaa97 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java @@ -51,7 +51,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer private long lastCounterUpdate; private final double cpuFrequencyInv; - private FlowEdge muxEdge; + private FlowEdge distributorEdge; private FlowEdge psuEdge; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -123,21 +123,16 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer public long onUpdate(long now) { updateCounters(now); - this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0); - - // Calculate Power Demand and send to PSU - double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization); + // Check if supply == demand + if (this.currentPowerDemand != this.currentPowerSupplied) { + this.pushOutgoingDemand(this.psuEdge, this.currentPowerDemand); - if (powerDemand != this.currentPowerDemand) { - this.pushDemand(this.psuEdge, powerDemand); + return Long.MAX_VALUE; } - // Calculate the amount of cpu this can provide - double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity); + this.currentCpuSupplied = Math.min(this.currentCpuDemand, this.maxCapacity); - if (cpuSupply != this.currentCpuSupplied) { - this.pushSupply(this.muxEdge, cpuSupply); - } + this.pushOutgoingSupply(this.distributorEdge, this.currentCpuSupplied); return Long.MAX_VALUE; } @@ -181,7 +176,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer * Push new demand to the psu */ @Override - public void pushDemand(FlowEdge supplierEdge, double newPowerDemand) { + public void pushOutgoingDemand(FlowEdge supplierEdge, double newPowerDemand) { updateCounters(); this.currentPowerDemand = newPowerDemand; this.psuEdge.pushDemand(newPowerDemand); @@ -191,47 +186,38 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer * Push updated supply to the mux */ @Override - public void pushSupply(FlowEdge consumerEdge, double newCpuSupply) { + public void pushOutgoingSupply(FlowEdge consumerEdge, double newCpuSupply) { updateCounters(); this.currentCpuSupplied = newCpuSupply; - this.muxEdge.pushSupply(newCpuSupply); + + this.distributorEdge.pushSupply(newCpuSupply, true); } /** * Handle new demand coming in from the mux */ @Override - public void handleDemand(FlowEdge consumerEdge, double newCpuDemand) { + public void handleIncomingDemand(FlowEdge consumerEdge, double newCpuDemand) { updateCounters(); this.currentCpuDemand = newCpuDemand; - this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity; this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0); // Calculate Power Demand and send to PSU - double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization); + this.currentPowerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization); - if (powerDemand != this.currentPowerDemand) { - this.pushDemand(this.psuEdge, powerDemand); - } + this.invalidate(); } /** * Handle updated supply from the psu */ @Override - public void handleSupply(FlowEdge supplierEdge, double newPowerSupply) { - // TODO: Implement this + public void handleIncomingSupply(FlowEdge supplierEdge, double newPowerSupply) { updateCounters(); this.currentPowerSupplied = newPowerSupply; - // Calculate the amount of cpu this can provide - double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity); - ; - - if (cpuSupply != this.currentCpuSupplied) { - this.pushSupply(this.muxEdge, cpuSupply); - } + this.invalidate(); } /** @@ -239,7 +225,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer */ @Override public void addConsumerEdge(FlowEdge consumerEdge) { - this.muxEdge = consumerEdge; + this.distributorEdge = consumerEdge; } /** @@ -257,7 +243,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer */ @Override public void removeConsumerEdge(FlowEdge consumerEdge) { - this.muxEdge = null; + this.distributorEdge = null; this.invalidate(); } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java index 074f0ed8..dab0c421 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java @@ -44,7 +44,7 @@ public class SimMachine { private final InstantSource clock; private SimCpu cpu; - private FlowDistributor cpuMux; + private FlowDistributor cpuDistributor; private SimPsu psu; private Memory memory; @@ -74,8 +74,8 @@ public class SimMachine { return cpu; } - public FlowDistributor getCpuMux() { - return cpuMux; + public FlowDistributor getCpuDistributor() { + return cpuDistributor; } public Memory getMemory() { @@ -114,7 +114,7 @@ public class SimMachine { public SimMachine( FlowGraph graph, MachineModel machineModel, - FlowDistributor powerMux, + FlowDistributor powerDistributor, CpuPowerModel cpuPowerModel, Consumer<Exception> completion) { this.graph = graph; @@ -124,7 +124,7 @@ public class SimMachine { // Create the psu and cpu and connect them this.psu = new SimPsu(graph); - graph.addEdge(this.psu, powerMux); + graph.addEdge(this.psu, powerDistributor); this.cpu = new SimCpu(graph, this.machineModel.getCpuModel(), cpuPowerModel, 0); @@ -133,8 +133,8 @@ public class SimMachine { this.memory = new Memory(graph, this.machineModel.getMemory()); // Create a FlowDistributor and add the cpu as supplier - this.cpuMux = new FlowDistributor(this.graph); - graph.addEdge(this.cpuMux, this.cpu); + this.cpuDistributor = new FlowDistributor(this.graph); + graph.addEdge(this.cpuDistributor, this.cpu); this.completion = completion; } @@ -153,8 +153,8 @@ public class SimMachine { this.graph.removeNode(this.cpu); this.cpu = null; - this.graph.removeNode(this.cpuMux); - this.cpuMux = null; + this.graph.removeNode(this.cpuDistributor); + this.cpuDistributor = null; this.memory = null; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java index b8a9c738..1946eecb 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java @@ -100,7 +100,7 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli this.clock = this.machine.getClock(); this.parentGraph = machine.getGraph(); - this.parentGraph.addEdge(this, this.machine.getCpuMux()); + this.parentGraph.addEdge(this, this.machine.getCpuDistributor()); this.lastUpdate = clock.millis(); this.lastUpdate = clock.millis(); @@ -185,7 +185,7 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli * Push demand to the cpuMux if the demand has changed **/ @Override - public void pushDemand(FlowEdge supplierEdge, double newDemand) { + public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { this.cpuEdge.pushDemand(newDemand); } @@ -193,7 +193,7 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli * Push supply to the workload if the supply has changed **/ @Override - public void pushSupply(FlowEdge consumerEdge, double newSupply) { + public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { this.workloadEdge.pushSupply(newSupply); } @@ -201,24 +201,24 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli * Handle new demand from the workload by sending it through to the cpuMux **/ @Override - public void handleDemand(FlowEdge consumerEdge, double newDemand) { + public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) { updateCounters(this.clock.millis()); this.cpuDemand = newDemand; - pushDemand(this.cpuEdge, newDemand); + pushOutgoingDemand(this.cpuEdge, newDemand); } /** * Handle a new supply pushed by the cpuMux by sending it through to the workload **/ @Override - public void handleSupply(FlowEdge supplierEdge, double newCpuSupply) { + public void handleIncomingSupply(FlowEdge supplierEdge, double newCpuSupply) { updateCounters(this.clock.millis()); this.cpuSupply = newCpuSupply; - pushSupply(this.workloadEdge, newCpuSupply); + pushOutgoingSupply(this.workloadEdge, newCpuSupply); } @Override diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java index e8626e40..d2270888 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java @@ -43,7 +43,7 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier { private double totalCarbonEmission = 0.0f; private CarbonModel carbonModel = null; - private FlowEdge muxEdge; + private FlowEdge distributorEdge; private double capacity = Long.MAX_VALUE; @@ -57,7 +57,7 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier { * @return <code>true</code> if the InPort is connected to an OutPort, <code>false</code> otherwise. */ public boolean isConnected() { - return muxEdge != null; + return distributorEdge != null; } /** @@ -156,30 +156,30 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @Override - public void handleDemand(FlowEdge consumerEdge, double newPowerDemand) { + public void handleIncomingDemand(FlowEdge consumerEdge, double newPowerDemand) { this.powerDemand = newPowerDemand; double powerSupply = this.powerDemand; if (powerSupply != this.powerSupplied) { - this.pushSupply(this.muxEdge, powerSupply); + this.pushOutgoingSupply(this.distributorEdge, powerSupply); } } @Override - public void pushSupply(FlowEdge consumerEdge, double newSupply) { + public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { this.powerSupplied = newSupply; consumerEdge.pushSupply(newSupply); } @Override public void addConsumerEdge(FlowEdge consumerEdge) { - this.muxEdge = consumerEdge; + this.distributorEdge = consumerEdge; } @Override public void removeConsumerEdge(FlowEdge consumerEdge) { - this.muxEdge = null; + this.distributorEdge = null; } // Update the carbon intensity of the power source diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java index c1e8a1b9..dc5129d6 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java @@ -106,7 +106,7 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer double powerSupply = this.powerDemand; if (powerSupply != this.powerSupplied) { - this.pushSupply(this.cpuEdge, powerSupply); + this.pushOutgoingSupply(this.cpuEdge, powerSupply); } return Long.MAX_VALUE; @@ -135,33 +135,33 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @Override - public void pushDemand(FlowEdge supplierEdge, double newDemand) { + public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { this.powerDemand = newDemand; powerSupplyEdge.pushDemand(newDemand); } @Override - public void pushSupply(FlowEdge consumerEdge, double newSupply) { + public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { this.powerSupplied = newSupply; cpuEdge.pushSupply(newSupply); } @Override - public void handleDemand(FlowEdge consumerEdge, double newPowerDemand) { + public void handleIncomingDemand(FlowEdge consumerEdge, double newPowerDemand) { updateCounters(); this.powerDemand = newPowerDemand; - pushDemand(this.powerSupplyEdge, newPowerDemand); + pushOutgoingDemand(this.powerSupplyEdge, newPowerDemand); } @Override - public void handleSupply(FlowEdge supplierEdge, double newPowerSupply) { + public void handleIncomingSupply(FlowEdge supplierEdge, double newPowerSupply) { updateCounters(); this.powerSupplied = newPowerSupply; - pushSupply(this.cpuEdge, newPowerSupply); + pushOutgoingSupply(this.cpuEdge, newPowerSupply); } @Override diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java index b612de2c..da6b8334 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -195,7 +195,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { * @param newDemand new demand to sent to the cpu */ @Override - public void pushDemand(FlowEdge supplierEdge, double newDemand) { + public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { this.demand = newDemand; this.machineEdge.pushDemand(newDemand); @@ -208,7 +208,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { * @param newSupply new supply to sent to the workload */ @Override - public void pushSupply(FlowEdge consumerEdge, double newSupply) { + public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { this.supply = newSupply; this.workloadEdge.pushSupply(newSupply); @@ -221,8 +221,8 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { * @param newDemand new demand coming from the workload */ @Override - public void handleDemand(FlowEdge consumerEdge, double newDemand) { - this.pushDemand(this.machineEdge, newDemand); + public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) { + this.pushOutgoingDemand(this.machineEdge, newDemand); } /** @@ -232,8 +232,8 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { * @param newSupply The new supply that is sent to the workload */ @Override - public void handleSupply(FlowEdge supplierEdge, double newSupply) { - this.pushSupply(this.machineEdge, newSupply); + public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { + this.pushOutgoingSupply(this.machineEdge, newSupply); } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java index 8487fbc2..0735d8ae 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java @@ -94,7 +94,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { graph.addEdge(this, supplier); this.currentFragment = this.getNextFragment(); - pushDemand(machineEdge, this.currentFragment.cpuUsage()); + pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage()); this.startOfFragment = now; } @@ -131,7 +131,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.startOfFragment = now - passedTime; // Change the cpu Usage to the new Fragment - pushDemand(machineEdge, this.currentFragment.cpuUsage()); + pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage()); // Return the time when the current fragment will complete return this.startOfFragment + duration; @@ -190,7 +190,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.fragmentIndex = -1; this.currentFragment = getNextFragment(); - pushDemand(this.machineEdge, this.currentFragment.cpuUsage()); + pushOutgoingDemand(this.machineEdge, this.currentFragment.cpuUsage()); this.startOfFragment = now; this.invalidate(); @@ -207,7 +207,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { * @param newSupply The new demand that needs to be sent to the VM */ @Override - public void handleSupply(FlowEdge supplierEdge, double newSupply) { + public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { if (newSupply == this.currentSupply) { return; } @@ -222,7 +222,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { * @param newDemand The new demand that needs to be sent to the VM */ @Override - public void pushDemand(FlowEdge supplierEdge, double newDemand) { + public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { if (newDemand == this.currentDemand) { return; } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index 24476048..67540f4e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -44,7 +44,7 @@ public final class FlowEngine implements Runnable { /** * A priority queue containing the {@link FlowNode} updates to be scheduled in the future. */ - private final FlowTimerQueue timerQueue = new FlowTimerQueue(256); + private final FlowEventQueue eventQueue = new FlowEventQueue(256); /** * The stack of engine invocations to occur in the future. @@ -127,7 +127,7 @@ public final class FlowEngine implements Runnable { return; } - long deadline = timerQueue.peekDeadline(); + long deadline = eventQueue.peekDeadline(); if (deadline != Long.MAX_VALUE) { trySchedule(futureInvocations, clock.millis(), deadline); } @@ -139,7 +139,7 @@ public final class FlowEngine implements Runnable { * This method should only be invoked while inside an engine cycle. */ public void scheduleDelayedInContext(FlowNode ctx) { - FlowTimerQueue timerQueue = this.timerQueue; + FlowEventQueue timerQueue = this.eventQueue; timerQueue.enqueue(ctx); } @@ -148,7 +148,7 @@ public final class FlowEngine implements Runnable { */ private void doRunEngine(long now) { final FlowCycleQueue queue = this.cycleQueue; - final FlowTimerQueue timerQueue = this.timerQueue; + final FlowEventQueue timerQueue = this.eventQueue; try { // Mark the engine as active to prevent concurrent calls to this method diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java index 049eb40d..53649d29 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java @@ -26,12 +26,12 @@ import java.util.Arrays; import org.opendc.simulator.engine.graph.FlowNode; /** - * A specialized priority queue for timers of {@link FlowNode}s. + * A specialized priority queue for future event of {@link FlowNode}s sorted on time. * <p> * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation * being generic. */ -public final class FlowTimerQueue { +public final class FlowEventQueue { /** * Array representation of binary heap of {@link FlowNode} instances. */ @@ -43,11 +43,11 @@ public final class FlowTimerQueue { private int size = 0; /** - * Construct a {@link FlowTimerQueue} with the specified initial capacity. + * Construct a {@link FlowEventQueue} with the specified initial capacity. * * @param initialCapacity The initial capacity of the queue. */ - public FlowTimerQueue(int initialCapacity) { + public FlowEventQueue(int initialCapacity) { this.queue = new FlowNode[initialCapacity]; } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java index 2130d376..a9da6f5d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java @@ -24,9 +24,9 @@ package org.opendc.simulator.engine.graph; public interface FlowConsumer { - void handleSupply(FlowEdge supplierEdge, double newSupply); + void handleIncomingSupply(FlowEdge supplierEdge, double newSupply); - void pushDemand(FlowEdge supplierEdge, double newDemand); + void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand); void addSupplierEdge(FlowEdge supplierEdge); diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index 7ef091f8..16bb161f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -29,27 +29,30 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); private FlowEdge supplierEdge; - private final ArrayList<Double> demands = new ArrayList<>(); // What is demanded by the consumers - private final ArrayList<Double> supplies = new ArrayList<>(); // What is supplied to the consumers + private final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers + private final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers - private double totalDemand; // The total demand of all the consumers - private double totalSupply; // The total supply from the supplier + private double totalIncomingDemand; // The total demand of all the consumers + private double currentIncomingSupply; // The current supply provided by the supplier - private boolean overLoaded = false; - private int currentConsumerIdx = -1; + private boolean outgoingDemandUpdateNeeded = false; - private double capacity; // What is the max capacity + private boolean overloaded = false; + + private double capacity; // What is the max capacity. Can probably be removed + + private final ArrayList<Integer> updatedDemands = new ArrayList<>(); public FlowDistributor(FlowGraph graph) { super(graph); } - public double getTotalDemand() { - return totalDemand; + public double getTotalIncomingDemand() { + return totalIncomingDemand; } - public double getTotalSupply() { - return totalSupply; + public double getCurrentIncomingSupply() { + return currentIncomingSupply; } public double getCapacity() { @@ -58,38 +61,61 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu public long onUpdate(long now) { + // Check if current supply is different from total demand + if (this.outgoingDemandUpdateNeeded) { + this.updateOutgoingDemand(); + + return Long.MAX_VALUE; + } + + this.updateOutgoingSupplies(); + return Long.MAX_VALUE; } - private void distributeSupply() { - // if supply >= demand -> push supplies to all tasks - if (this.totalSupply > this.totalDemand) { + private void updateOutgoingDemand() { + this.pushOutgoingDemand(this.supplierEdge, this.totalIncomingDemand); - // If this came from a state of overload, provide all consumers with their demand - if (this.overLoaded) { - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx)); - } - } + this.outgoingDemandUpdateNeeded = false; - if (this.currentConsumerIdx != -1) { - this.pushSupply( - this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx)); - this.currentConsumerIdx = -1; - } + this.invalidate(); + } - this.overLoaded = false; - } + private void updateOutgoingSupplies() { + + // If the demand is higher than the current supply, the system is overloaded. + // The available supply is distributed based on the current distribution function. + if (this.totalIncomingDemand > this.currentIncomingSupply) { + this.overloaded = true; - // if supply < demand -> distribute the supply over all consumers - else { - this.overLoaded = true; - double[] supplies = redistributeSupply(this.demands, this.totalSupply); + double[] supplies = distributeSupply(this.incomingDemands, this.currentIncomingSupply); for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushSupply(this.consumerEdges.get(idx), supplies[idx]); + this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx]); + } + + } else { + + // If the distributor was overloaded before, but is not anymore: + // provide all consumers with their demand + if (this.overloaded) { + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + if (this.outgoingSupplies.get(idx) != this.incomingDemands.get(idx)) { + this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx)); + } + } + this.overloaded = false; + } + + // Update the supplies of the consumers that changed their demand in the current cycle + else { + for (int idx : this.updatedDemands) { + this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx)); + } } } + + this.updatedDemands.clear(); } private record Demand(int idx, double value) {} @@ -97,10 +123,8 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu /** * Distributed the available supply over the different demands. * The supply is distributed using MaxMin Fairness. - * - * TODO: Move this outside of the Distributor so we can easily add different redistribution methods */ - private static double[] redistributeSupply(ArrayList<Double> demands, double totalSupply) { + private static double[] distributeSupply(ArrayList<Double> demands, double currentSupply) { int inputSize = demands.size(); final double[] supplies = new double[inputSize]; @@ -116,7 +140,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu return i1.compareTo(i2); }); - double availableCapacity = totalSupply; // totalSupply + double availableCapacity = currentSupply; // totalSupply for (int i = 0; i < inputSize; i++) { double d = tempDemands[i].value; @@ -133,7 +157,6 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu availableCapacity -= r; } - // Return the used capacity return supplies; } @@ -146,15 +169,15 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu consumerEdge.setConsumerIndex(this.consumerEdges.size()); this.consumerEdges.add(consumerEdge); - this.demands.add(0.0); - this.supplies.add(0.0); + this.incomingDemands.add(0.0); + this.outgoingSupplies.add(0.0); } @Override public void addSupplierEdge(FlowEdge supplierEdge) { this.supplierEdge = supplierEdge; this.capacity = supplierEdge.getCapacity(); - this.totalSupply = 0; + this.currentIncomingSupply = 0; } @Override @@ -165,84 +188,87 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu return; } - this.totalDemand -= consumerEdge.getDemand(); + this.totalIncomingDemand -= consumerEdge.getDemand(); this.consumerEdges.remove(idx); - this.demands.remove(idx); - this.supplies.remove(idx); + this.incomingDemands.remove(idx); + this.outgoingSupplies.remove(idx); // update the consumer index for all consumerEdges higher than this. for (int i = idx; i < this.consumerEdges.size(); i++) { this.consumerEdges.get(i).setConsumerIndex(i); } - this.currentConsumerIdx = -1; + for (int i = 0; i < this.updatedDemands.size(); i++) { + int j = this.updatedDemands.get(i); - if (this.overLoaded) { - this.distributeSupply(); + if (j == idx) { + this.updatedDemands.remove(idx); + } + if (j > idx) { + this.updatedDemands.set(i, j - 1); + } } - this.pushDemand(this.supplierEdge, this.totalDemand); + this.outgoingDemandUpdateNeeded = true; + this.invalidate(); } @Override public void removeSupplierEdge(FlowEdge supplierEdge) { this.supplierEdge = null; this.capacity = 0; - this.totalSupply = 0; + this.currentIncomingSupply = 0; + + this.invalidate(); } @Override - public void handleDemand(FlowEdge consumerEdge, double newDemand) { + public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) { int idx = consumerEdge.getConsumerIndex(); - this.currentConsumerIdx = idx; - if (idx == -1) { System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer"); return; } // Update the total demand (This is cheaper than summing over all demands) - double prevDemand = demands.get(idx); + double prevDemand = incomingDemands.get(idx); - demands.set(idx, newDemand); - this.totalDemand += (newDemand - prevDemand); + incomingDemands.set(idx, newDemand); + this.totalIncomingDemand += (newDemand - prevDemand); - if (overLoaded) { - distributeSupply(); - } + this.updatedDemands.add(idx); - // Send new totalDemand to CPU - // TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply) - this.pushDemand(this.supplierEdge, this.totalDemand); + this.outgoingDemandUpdateNeeded = true; + this.invalidate(); } @Override - public void handleSupply(FlowEdge supplierEdge, double newSupply) { - this.totalSupply = newSupply; + public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { + this.currentIncomingSupply = newSupply; - this.distributeSupply(); + this.invalidate(); } @Override - public void pushDemand(FlowEdge supplierEdge, double newDemand) { + public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { this.supplierEdge.pushDemand(newDemand); } @Override - public void pushSupply(FlowEdge consumerEdge, double newSupply) { + public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { int idx = consumerEdge.getConsumerIndex(); if (idx == -1) { System.out.println("Error (FlowDistributor): pushing supply to an unknown consumer"); } - if (supplies.get(idx) == newSupply) { + if (outgoingSupplies.get(idx) == newSupply) { return; } - supplies.set(idx, newSupply); + outgoingSupplies.set(idx, newSupply); consumerEdge.pushSupply(newSupply); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java index b7162508..9521f2ce 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java @@ -108,24 +108,38 @@ public class FlowEdge { /** * Push new demand from the Consumer to the Supplier */ - public void pushDemand(double newDemand) { - if (newDemand == this.demand) { + public void pushDemand(double newDemand, boolean forceThrough) { + if ((newDemand == this.demand) && !forceThrough) { return; } this.demand = newDemand; - this.supplier.handleDemand(this, newDemand); + this.supplier.handleIncomingDemand(this, newDemand); + } + + /** + * Push new demand from the Consumer to the Supplier + */ + public void pushDemand(double newDemand) { + this.pushDemand(newDemand, false); } /** * Push new supply from the Supplier to the Consumer */ - public void pushSupply(double newSupply) { - if (newSupply == this.supply) { + public void pushSupply(double newSupply, boolean forceThrough) { + if ((newSupply == this.supply) && !forceThrough) { return; } this.supply = newSupply; - this.consumer.handleSupply(this, newSupply); + this.consumer.handleIncomingSupply(this, newSupply); + } + + /** + * Push new supply from the Supplier to the Consumer + */ + public void pushSupply(double newSupply) { + this.pushSupply(newSupply, false); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java index 64cd0d8c..e24e9f93 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java @@ -24,7 +24,7 @@ package org.opendc.simulator.engine.graph; import java.time.InstantSource; import org.opendc.simulator.engine.engine.FlowEngine; -import org.opendc.simulator.engine.engine.FlowTimerQueue; +import org.opendc.simulator.engine.engine.FlowEventQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,7 +109,7 @@ public abstract class FlowNode { private long deadline = Long.MAX_VALUE; /** - * The index of the timer in the {@link FlowTimerQueue}. + * The index of the timer in the {@link FlowEventQueue}. */ private int timerIndex = -1; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java index 84602ee0..da65392b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java @@ -24,9 +24,9 @@ package org.opendc.simulator.engine.graph; public interface FlowSupplier { - void handleDemand(FlowEdge consumerEdge, double newDemand); + void handleIncomingDemand(FlowEdge consumerEdge, double newDemand); - void pushSupply(FlowEdge consumerEdge, double newSupply); + void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply); void addConsumerEdge(FlowEdge consumerEdge); |
