summaryrefslogtreecommitdiff
path: root/opendc-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java50
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java18
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java12
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java)8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java160
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java26
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java4
14 files changed, 186 insertions, 160 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java
index c5b8a9ea..a9edaa97 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java
@@ -51,7 +51,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
private long lastCounterUpdate;
private final double cpuFrequencyInv;
- private FlowEdge muxEdge;
+ private FlowEdge distributorEdge;
private FlowEdge psuEdge;
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -123,21 +123,16 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
public long onUpdate(long now) {
updateCounters(now);
- this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0);
-
- // Calculate Power Demand and send to PSU
- double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization);
+ // Check if supply == demand
+ if (this.currentPowerDemand != this.currentPowerSupplied) {
+ this.pushOutgoingDemand(this.psuEdge, this.currentPowerDemand);
- if (powerDemand != this.currentPowerDemand) {
- this.pushDemand(this.psuEdge, powerDemand);
+ return Long.MAX_VALUE;
}
- // Calculate the amount of cpu this can provide
- double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity);
+ this.currentCpuSupplied = Math.min(this.currentCpuDemand, this.maxCapacity);
- if (cpuSupply != this.currentCpuSupplied) {
- this.pushSupply(this.muxEdge, cpuSupply);
- }
+ this.pushOutgoingSupply(this.distributorEdge, this.currentCpuSupplied);
return Long.MAX_VALUE;
}
@@ -181,7 +176,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
* Push new demand to the psu
*/
@Override
- public void pushDemand(FlowEdge supplierEdge, double newPowerDemand) {
+ public void pushOutgoingDemand(FlowEdge supplierEdge, double newPowerDemand) {
updateCounters();
this.currentPowerDemand = newPowerDemand;
this.psuEdge.pushDemand(newPowerDemand);
@@ -191,47 +186,38 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
* Push updated supply to the mux
*/
@Override
- public void pushSupply(FlowEdge consumerEdge, double newCpuSupply) {
+ public void pushOutgoingSupply(FlowEdge consumerEdge, double newCpuSupply) {
updateCounters();
this.currentCpuSupplied = newCpuSupply;
- this.muxEdge.pushSupply(newCpuSupply);
+
+ this.distributorEdge.pushSupply(newCpuSupply, true);
}
/**
* Handle new demand coming in from the mux
*/
@Override
- public void handleDemand(FlowEdge consumerEdge, double newCpuDemand) {
+ public void handleIncomingDemand(FlowEdge consumerEdge, double newCpuDemand) {
updateCounters();
this.currentCpuDemand = newCpuDemand;
- this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity;
this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0);
// Calculate Power Demand and send to PSU
- double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization);
+ this.currentPowerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization);
- if (powerDemand != this.currentPowerDemand) {
- this.pushDemand(this.psuEdge, powerDemand);
- }
+ this.invalidate();
}
/**
* Handle updated supply from the psu
*/
@Override
- public void handleSupply(FlowEdge supplierEdge, double newPowerSupply) {
- // TODO: Implement this
+ public void handleIncomingSupply(FlowEdge supplierEdge, double newPowerSupply) {
updateCounters();
this.currentPowerSupplied = newPowerSupply;
- // Calculate the amount of cpu this can provide
- double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity);
- ;
-
- if (cpuSupply != this.currentCpuSupplied) {
- this.pushSupply(this.muxEdge, cpuSupply);
- }
+ this.invalidate();
}
/**
@@ -239,7 +225,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
*/
@Override
public void addConsumerEdge(FlowEdge consumerEdge) {
- this.muxEdge = consumerEdge;
+ this.distributorEdge = consumerEdge;
}
/**
@@ -257,7 +243,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
*/
@Override
public void removeConsumerEdge(FlowEdge consumerEdge) {
- this.muxEdge = null;
+ this.distributorEdge = null;
this.invalidate();
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java
index 074f0ed8..dab0c421 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java
@@ -44,7 +44,7 @@ public class SimMachine {
private final InstantSource clock;
private SimCpu cpu;
- private FlowDistributor cpuMux;
+ private FlowDistributor cpuDistributor;
private SimPsu psu;
private Memory memory;
@@ -74,8 +74,8 @@ public class SimMachine {
return cpu;
}
- public FlowDistributor getCpuMux() {
- return cpuMux;
+ public FlowDistributor getCpuDistributor() {
+ return cpuDistributor;
}
public Memory getMemory() {
@@ -114,7 +114,7 @@ public class SimMachine {
public SimMachine(
FlowGraph graph,
MachineModel machineModel,
- FlowDistributor powerMux,
+ FlowDistributor powerDistributor,
CpuPowerModel cpuPowerModel,
Consumer<Exception> completion) {
this.graph = graph;
@@ -124,7 +124,7 @@ public class SimMachine {
// Create the psu and cpu and connect them
this.psu = new SimPsu(graph);
- graph.addEdge(this.psu, powerMux);
+ graph.addEdge(this.psu, powerDistributor);
this.cpu = new SimCpu(graph, this.machineModel.getCpuModel(), cpuPowerModel, 0);
@@ -133,8 +133,8 @@ public class SimMachine {
this.memory = new Memory(graph, this.machineModel.getMemory());
// Create a FlowDistributor and add the cpu as supplier
- this.cpuMux = new FlowDistributor(this.graph);
- graph.addEdge(this.cpuMux, this.cpu);
+ this.cpuDistributor = new FlowDistributor(this.graph);
+ graph.addEdge(this.cpuDistributor, this.cpu);
this.completion = completion;
}
@@ -153,8 +153,8 @@ public class SimMachine {
this.graph.removeNode(this.cpu);
this.cpu = null;
- this.graph.removeNode(this.cpuMux);
- this.cpuMux = null;
+ this.graph.removeNode(this.cpuDistributor);
+ this.cpuDistributor = null;
this.memory = null;
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java
index b8a9c738..1946eecb 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java
@@ -100,7 +100,7 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli
this.clock = this.machine.getClock();
this.parentGraph = machine.getGraph();
- this.parentGraph.addEdge(this, this.machine.getCpuMux());
+ this.parentGraph.addEdge(this, this.machine.getCpuDistributor());
this.lastUpdate = clock.millis();
this.lastUpdate = clock.millis();
@@ -185,7 +185,7 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli
* Push demand to the cpuMux if the demand has changed
**/
@Override
- public void pushDemand(FlowEdge supplierEdge, double newDemand) {
+ public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
this.cpuEdge.pushDemand(newDemand);
}
@@ -193,7 +193,7 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli
* Push supply to the workload if the supply has changed
**/
@Override
- public void pushSupply(FlowEdge consumerEdge, double newSupply) {
+ public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) {
this.workloadEdge.pushSupply(newSupply);
}
@@ -201,24 +201,24 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli
* Handle new demand from the workload by sending it through to the cpuMux
**/
@Override
- public void handleDemand(FlowEdge consumerEdge, double newDemand) {
+ public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) {
updateCounters(this.clock.millis());
this.cpuDemand = newDemand;
- pushDemand(this.cpuEdge, newDemand);
+ pushOutgoingDemand(this.cpuEdge, newDemand);
}
/**
* Handle a new supply pushed by the cpuMux by sending it through to the workload
**/
@Override
- public void handleSupply(FlowEdge supplierEdge, double newCpuSupply) {
+ public void handleIncomingSupply(FlowEdge supplierEdge, double newCpuSupply) {
updateCounters(this.clock.millis());
this.cpuSupply = newCpuSupply;
- pushSupply(this.workloadEdge, newCpuSupply);
+ pushOutgoingSupply(this.workloadEdge, newCpuSupply);
}
@Override
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java
index e8626e40..d2270888 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java
@@ -43,7 +43,7 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier {
private double totalCarbonEmission = 0.0f;
private CarbonModel carbonModel = null;
- private FlowEdge muxEdge;
+ private FlowEdge distributorEdge;
private double capacity = Long.MAX_VALUE;
@@ -57,7 +57,7 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier {
* @return <code>true</code> if the InPort is connected to an OutPort, <code>false</code> otherwise.
*/
public boolean isConnected() {
- return muxEdge != null;
+ return distributorEdge != null;
}
/**
@@ -156,30 +156,30 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@Override
- public void handleDemand(FlowEdge consumerEdge, double newPowerDemand) {
+ public void handleIncomingDemand(FlowEdge consumerEdge, double newPowerDemand) {
this.powerDemand = newPowerDemand;
double powerSupply = this.powerDemand;
if (powerSupply != this.powerSupplied) {
- this.pushSupply(this.muxEdge, powerSupply);
+ this.pushOutgoingSupply(this.distributorEdge, powerSupply);
}
}
@Override
- public void pushSupply(FlowEdge consumerEdge, double newSupply) {
+ public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) {
this.powerSupplied = newSupply;
consumerEdge.pushSupply(newSupply);
}
@Override
public void addConsumerEdge(FlowEdge consumerEdge) {
- this.muxEdge = consumerEdge;
+ this.distributorEdge = consumerEdge;
}
@Override
public void removeConsumerEdge(FlowEdge consumerEdge) {
- this.muxEdge = null;
+ this.distributorEdge = null;
}
// Update the carbon intensity of the power source
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java
index c1e8a1b9..dc5129d6 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java
@@ -106,7 +106,7 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer
double powerSupply = this.powerDemand;
if (powerSupply != this.powerSupplied) {
- this.pushSupply(this.cpuEdge, powerSupply);
+ this.pushOutgoingSupply(this.cpuEdge, powerSupply);
}
return Long.MAX_VALUE;
@@ -135,33 +135,33 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@Override
- public void pushDemand(FlowEdge supplierEdge, double newDemand) {
+ public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
this.powerDemand = newDemand;
powerSupplyEdge.pushDemand(newDemand);
}
@Override
- public void pushSupply(FlowEdge consumerEdge, double newSupply) {
+ public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) {
this.powerSupplied = newSupply;
cpuEdge.pushSupply(newSupply);
}
@Override
- public void handleDemand(FlowEdge consumerEdge, double newPowerDemand) {
+ public void handleIncomingDemand(FlowEdge consumerEdge, double newPowerDemand) {
updateCounters();
this.powerDemand = newPowerDemand;
- pushDemand(this.powerSupplyEdge, newPowerDemand);
+ pushOutgoingDemand(this.powerSupplyEdge, newPowerDemand);
}
@Override
- public void handleSupply(FlowEdge supplierEdge, double newPowerSupply) {
+ public void handleIncomingSupply(FlowEdge supplierEdge, double newPowerSupply) {
updateCounters();
this.powerSupplied = newPowerSupply;
- pushSupply(this.cpuEdge, newPowerSupply);
+ pushOutgoingSupply(this.cpuEdge, newPowerSupply);
}
@Override
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java
index b612de2c..da6b8334 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java
@@ -195,7 +195,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier {
* @param newDemand new demand to sent to the cpu
*/
@Override
- public void pushDemand(FlowEdge supplierEdge, double newDemand) {
+ public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
this.demand = newDemand;
this.machineEdge.pushDemand(newDemand);
@@ -208,7 +208,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier {
* @param newSupply new supply to sent to the workload
*/
@Override
- public void pushSupply(FlowEdge consumerEdge, double newSupply) {
+ public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) {
this.supply = newSupply;
this.workloadEdge.pushSupply(newSupply);
@@ -221,8 +221,8 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier {
* @param newDemand new demand coming from the workload
*/
@Override
- public void handleDemand(FlowEdge consumerEdge, double newDemand) {
- this.pushDemand(this.machineEdge, newDemand);
+ public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) {
+ this.pushOutgoingDemand(this.machineEdge, newDemand);
}
/**
@@ -232,8 +232,8 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier {
* @param newSupply The new supply that is sent to the workload
*/
@Override
- public void handleSupply(FlowEdge supplierEdge, double newSupply) {
- this.pushSupply(this.machineEdge, newSupply);
+ public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) {
+ this.pushOutgoingSupply(this.machineEdge, newSupply);
}
/**
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java
index 8487fbc2..0735d8ae 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java
@@ -94,7 +94,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
graph.addEdge(this, supplier);
this.currentFragment = this.getNextFragment();
- pushDemand(machineEdge, this.currentFragment.cpuUsage());
+ pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage());
this.startOfFragment = now;
}
@@ -131,7 +131,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.startOfFragment = now - passedTime;
// Change the cpu Usage to the new Fragment
- pushDemand(machineEdge, this.currentFragment.cpuUsage());
+ pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage());
// Return the time when the current fragment will complete
return this.startOfFragment + duration;
@@ -190,7 +190,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.fragmentIndex = -1;
this.currentFragment = getNextFragment();
- pushDemand(this.machineEdge, this.currentFragment.cpuUsage());
+ pushOutgoingDemand(this.machineEdge, this.currentFragment.cpuUsage());
this.startOfFragment = now;
this.invalidate();
@@ -207,7 +207,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
* @param newSupply The new demand that needs to be sent to the VM
*/
@Override
- public void handleSupply(FlowEdge supplierEdge, double newSupply) {
+ public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) {
if (newSupply == this.currentSupply) {
return;
}
@@ -222,7 +222,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
* @param newDemand The new demand that needs to be sent to the VM
*/
@Override
- public void pushDemand(FlowEdge supplierEdge, double newDemand) {
+ public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
if (newDemand == this.currentDemand) {
return;
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
index 24476048..67540f4e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
@@ -44,7 +44,7 @@ public final class FlowEngine implements Runnable {
/**
* A priority queue containing the {@link FlowNode} updates to be scheduled in the future.
*/
- private final FlowTimerQueue timerQueue = new FlowTimerQueue(256);
+ private final FlowEventQueue eventQueue = new FlowEventQueue(256);
/**
* The stack of engine invocations to occur in the future.
@@ -127,7 +127,7 @@ public final class FlowEngine implements Runnable {
return;
}
- long deadline = timerQueue.peekDeadline();
+ long deadline = eventQueue.peekDeadline();
if (deadline != Long.MAX_VALUE) {
trySchedule(futureInvocations, clock.millis(), deadline);
}
@@ -139,7 +139,7 @@ public final class FlowEngine implements Runnable {
* This method should only be invoked while inside an engine cycle.
*/
public void scheduleDelayedInContext(FlowNode ctx) {
- FlowTimerQueue timerQueue = this.timerQueue;
+ FlowEventQueue timerQueue = this.eventQueue;
timerQueue.enqueue(ctx);
}
@@ -148,7 +148,7 @@ public final class FlowEngine implements Runnable {
*/
private void doRunEngine(long now) {
final FlowCycleQueue queue = this.cycleQueue;
- final FlowTimerQueue timerQueue = this.timerQueue;
+ final FlowEventQueue timerQueue = this.eventQueue;
try {
// Mark the engine as active to prevent concurrent calls to this method
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java
index 049eb40d..53649d29 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java
@@ -26,12 +26,12 @@ import java.util.Arrays;
import org.opendc.simulator.engine.graph.FlowNode;
/**
- * A specialized priority queue for timers of {@link FlowNode}s.
+ * A specialized priority queue for future event of {@link FlowNode}s sorted on time.
* <p>
* By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation
* being generic.
*/
-public final class FlowTimerQueue {
+public final class FlowEventQueue {
/**
* Array representation of binary heap of {@link FlowNode} instances.
*/
@@ -43,11 +43,11 @@ public final class FlowTimerQueue {
private int size = 0;
/**
- * Construct a {@link FlowTimerQueue} with the specified initial capacity.
+ * Construct a {@link FlowEventQueue} with the specified initial capacity.
*
* @param initialCapacity The initial capacity of the queue.
*/
- public FlowTimerQueue(int initialCapacity) {
+ public FlowEventQueue(int initialCapacity) {
this.queue = new FlowNode[initialCapacity];
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java
index 2130d376..a9da6f5d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java
@@ -24,9 +24,9 @@ package org.opendc.simulator.engine.graph;
public interface FlowConsumer {
- void handleSupply(FlowEdge supplierEdge, double newSupply);
+ void handleIncomingSupply(FlowEdge supplierEdge, double newSupply);
- void pushDemand(FlowEdge supplierEdge, double newDemand);
+ void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand);
void addSupplierEdge(FlowEdge supplierEdge);
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
index 7ef091f8..16bb161f 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
@@ -29,27 +29,30 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
private FlowEdge supplierEdge;
- private final ArrayList<Double> demands = new ArrayList<>(); // What is demanded by the consumers
- private final ArrayList<Double> supplies = new ArrayList<>(); // What is supplied to the consumers
+ private final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers
+ private final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers
- private double totalDemand; // The total demand of all the consumers
- private double totalSupply; // The total supply from the supplier
+ private double totalIncomingDemand; // The total demand of all the consumers
+ private double currentIncomingSupply; // The current supply provided by the supplier
- private boolean overLoaded = false;
- private int currentConsumerIdx = -1;
+ private boolean outgoingDemandUpdateNeeded = false;
- private double capacity; // What is the max capacity
+ private boolean overloaded = false;
+
+ private double capacity; // What is the max capacity. Can probably be removed
+
+ private final ArrayList<Integer> updatedDemands = new ArrayList<>();
public FlowDistributor(FlowGraph graph) {
super(graph);
}
- public double getTotalDemand() {
- return totalDemand;
+ public double getTotalIncomingDemand() {
+ return totalIncomingDemand;
}
- public double getTotalSupply() {
- return totalSupply;
+ public double getCurrentIncomingSupply() {
+ return currentIncomingSupply;
}
public double getCapacity() {
@@ -58,38 +61,61 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
public long onUpdate(long now) {
+ // Check if current supply is different from total demand
+ if (this.outgoingDemandUpdateNeeded) {
+ this.updateOutgoingDemand();
+
+ return Long.MAX_VALUE;
+ }
+
+ this.updateOutgoingSupplies();
+
return Long.MAX_VALUE;
}
- private void distributeSupply() {
- // if supply >= demand -> push supplies to all tasks
- if (this.totalSupply > this.totalDemand) {
+ private void updateOutgoingDemand() {
+ this.pushOutgoingDemand(this.supplierEdge, this.totalIncomingDemand);
- // If this came from a state of overload, provide all consumers with their demand
- if (this.overLoaded) {
- for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx));
- }
- }
+ this.outgoingDemandUpdateNeeded = false;
- if (this.currentConsumerIdx != -1) {
- this.pushSupply(
- this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx));
- this.currentConsumerIdx = -1;
- }
+ this.invalidate();
+ }
- this.overLoaded = false;
- }
+ private void updateOutgoingSupplies() {
+
+ // If the demand is higher than the current supply, the system is overloaded.
+ // The available supply is distributed based on the current distribution function.
+ if (this.totalIncomingDemand > this.currentIncomingSupply) {
+ this.overloaded = true;
- // if supply < demand -> distribute the supply over all consumers
- else {
- this.overLoaded = true;
- double[] supplies = redistributeSupply(this.demands, this.totalSupply);
+ double[] supplies = distributeSupply(this.incomingDemands, this.currentIncomingSupply);
for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- this.pushSupply(this.consumerEdges.get(idx), supplies[idx]);
+ this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx]);
+ }
+
+ } else {
+
+ // If the distributor was overloaded before, but is not anymore:
+ // provide all consumers with their demand
+ if (this.overloaded) {
+ for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
+ if (this.outgoingSupplies.get(idx) != this.incomingDemands.get(idx)) {
+ this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx));
+ }
+ }
+ this.overloaded = false;
+ }
+
+ // Update the supplies of the consumers that changed their demand in the current cycle
+ else {
+ for (int idx : this.updatedDemands) {
+ this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx));
+ }
}
}
+
+ this.updatedDemands.clear();
}
private record Demand(int idx, double value) {}
@@ -97,10 +123,8 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
/**
* Distributed the available supply over the different demands.
* The supply is distributed using MaxMin Fairness.
- *
- * TODO: Move this outside of the Distributor so we can easily add different redistribution methods
*/
- private static double[] redistributeSupply(ArrayList<Double> demands, double totalSupply) {
+ private static double[] distributeSupply(ArrayList<Double> demands, double currentSupply) {
int inputSize = demands.size();
final double[] supplies = new double[inputSize];
@@ -116,7 +140,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
return i1.compareTo(i2);
});
- double availableCapacity = totalSupply; // totalSupply
+ double availableCapacity = currentSupply; // totalSupply
for (int i = 0; i < inputSize; i++) {
double d = tempDemands[i].value;
@@ -133,7 +157,6 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
availableCapacity -= r;
}
- // Return the used capacity
return supplies;
}
@@ -146,15 +169,15 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
consumerEdge.setConsumerIndex(this.consumerEdges.size());
this.consumerEdges.add(consumerEdge);
- this.demands.add(0.0);
- this.supplies.add(0.0);
+ this.incomingDemands.add(0.0);
+ this.outgoingSupplies.add(0.0);
}
@Override
public void addSupplierEdge(FlowEdge supplierEdge) {
this.supplierEdge = supplierEdge;
this.capacity = supplierEdge.getCapacity();
- this.totalSupply = 0;
+ this.currentIncomingSupply = 0;
}
@Override
@@ -165,84 +188,87 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
return;
}
- this.totalDemand -= consumerEdge.getDemand();
+ this.totalIncomingDemand -= consumerEdge.getDemand();
this.consumerEdges.remove(idx);
- this.demands.remove(idx);
- this.supplies.remove(idx);
+ this.incomingDemands.remove(idx);
+ this.outgoingSupplies.remove(idx);
// update the consumer index for all consumerEdges higher than this.
for (int i = idx; i < this.consumerEdges.size(); i++) {
this.consumerEdges.get(i).setConsumerIndex(i);
}
- this.currentConsumerIdx = -1;
+ for (int i = 0; i < this.updatedDemands.size(); i++) {
+ int j = this.updatedDemands.get(i);
- if (this.overLoaded) {
- this.distributeSupply();
+ if (j == idx) {
+ this.updatedDemands.remove(idx);
+ }
+ if (j > idx) {
+ this.updatedDemands.set(i, j - 1);
+ }
}
- this.pushDemand(this.supplierEdge, this.totalDemand);
+ this.outgoingDemandUpdateNeeded = true;
+ this.invalidate();
}
@Override
public void removeSupplierEdge(FlowEdge supplierEdge) {
this.supplierEdge = null;
this.capacity = 0;
- this.totalSupply = 0;
+ this.currentIncomingSupply = 0;
+
+ this.invalidate();
}
@Override
- public void handleDemand(FlowEdge consumerEdge, double newDemand) {
+ public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) {
int idx = consumerEdge.getConsumerIndex();
- this.currentConsumerIdx = idx;
-
if (idx == -1) {
System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer");
return;
}
// Update the total demand (This is cheaper than summing over all demands)
- double prevDemand = demands.get(idx);
+ double prevDemand = incomingDemands.get(idx);
- demands.set(idx, newDemand);
- this.totalDemand += (newDemand - prevDemand);
+ incomingDemands.set(idx, newDemand);
+ this.totalIncomingDemand += (newDemand - prevDemand);
- if (overLoaded) {
- distributeSupply();
- }
+ this.updatedDemands.add(idx);
- // Send new totalDemand to CPU
- // TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply)
- this.pushDemand(this.supplierEdge, this.totalDemand);
+ this.outgoingDemandUpdateNeeded = true;
+ this.invalidate();
}
@Override
- public void handleSupply(FlowEdge supplierEdge, double newSupply) {
- this.totalSupply = newSupply;
+ public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) {
+ this.currentIncomingSupply = newSupply;
- this.distributeSupply();
+ this.invalidate();
}
@Override
- public void pushDemand(FlowEdge supplierEdge, double newDemand) {
+ public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
this.supplierEdge.pushDemand(newDemand);
}
@Override
- public void pushSupply(FlowEdge consumerEdge, double newSupply) {
+ public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) {
int idx = consumerEdge.getConsumerIndex();
if (idx == -1) {
System.out.println("Error (FlowDistributor): pushing supply to an unknown consumer");
}
- if (supplies.get(idx) == newSupply) {
+ if (outgoingSupplies.get(idx) == newSupply) {
return;
}
- supplies.set(idx, newSupply);
+ outgoingSupplies.set(idx, newSupply);
consumerEdge.pushSupply(newSupply);
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java
index b7162508..9521f2ce 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java
@@ -108,24 +108,38 @@ public class FlowEdge {
/**
* Push new demand from the Consumer to the Supplier
*/
- public void pushDemand(double newDemand) {
- if (newDemand == this.demand) {
+ public void pushDemand(double newDemand, boolean forceThrough) {
+ if ((newDemand == this.demand) && !forceThrough) {
return;
}
this.demand = newDemand;
- this.supplier.handleDemand(this, newDemand);
+ this.supplier.handleIncomingDemand(this, newDemand);
+ }
+
+ /**
+ * Push new demand from the Consumer to the Supplier
+ */
+ public void pushDemand(double newDemand) {
+ this.pushDemand(newDemand, false);
}
/**
* Push new supply from the Supplier to the Consumer
*/
- public void pushSupply(double newSupply) {
- if (newSupply == this.supply) {
+ public void pushSupply(double newSupply, boolean forceThrough) {
+ if ((newSupply == this.supply) && !forceThrough) {
return;
}
this.supply = newSupply;
- this.consumer.handleSupply(this, newSupply);
+ this.consumer.handleIncomingSupply(this, newSupply);
+ }
+
+ /**
+ * Push new supply from the Supplier to the Consumer
+ */
+ public void pushSupply(double newSupply) {
+ this.pushSupply(newSupply, false);
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java
index 64cd0d8c..e24e9f93 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java
@@ -24,7 +24,7 @@ package org.opendc.simulator.engine.graph;
import java.time.InstantSource;
import org.opendc.simulator.engine.engine.FlowEngine;
-import org.opendc.simulator.engine.engine.FlowTimerQueue;
+import org.opendc.simulator.engine.engine.FlowEventQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,7 +109,7 @@ public abstract class FlowNode {
private long deadline = Long.MAX_VALUE;
/**
- * The index of the timer in the {@link FlowTimerQueue}.
+ * The index of the timer in the {@link FlowEventQueue}.
*/
private int timerIndex = -1;
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java
index 84602ee0..da65392b 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java
@@ -24,9 +24,9 @@ package org.opendc.simulator.engine.graph;
public interface FlowSupplier {
- void handleDemand(FlowEdge consumerEdge, double newDemand);
+ void handleIncomingDemand(FlowEdge consumerEdge, double newDemand);
- void pushSupply(FlowEdge consumerEdge, double newSupply);
+ void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply);
void addConsumerEdge(FlowEdge consumerEdge);