diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2025-03-18 07:26:35 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-03-18 07:26:35 +0100 |
| commit | 97db8e0351b9451ece8fd16c25ca0588ec71a2ab (patch) | |
| tree | e72f32df2b12677e9ae2f9997b226c8da97e56e4 /opendc-simulator/opendc-simulator-compute/src | |
| parent | 7dc2639a7fcdf51ef789f4af2e3afff11438be6e (diff) | |
Performance updates (#314)
Diffstat (limited to 'opendc-simulator/opendc-simulator-compute/src')
22 files changed, 291 insertions, 353 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java index a9edaa97..fadfc4d6 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java @@ -22,11 +22,13 @@ package org.opendc.simulator.compute.cpu; +import java.util.List; +import java.util.Map; import org.opendc.simulator.compute.machine.PerformanceCounters; import org.opendc.simulator.compute.models.CpuModel; +import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowConsumer; import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; @@ -102,15 +104,15 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer // Constructors //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public SimCpu(FlowGraph graph, CpuModel cpuModel, CpuPowerModel powerModel, int id) { - super(graph); + public SimCpu(FlowEngine engine, CpuModel cpuModel, CpuPowerModel powerModel, int id) { + super(engine); this.cpuModel = cpuModel; this.maxCapacity = this.cpuModel.getTotalCapacity(); // TODO: connect this to the front-end this.cpuPowerModel = powerModel; - this.lastCounterUpdate = graph.getEngine().getClock().millis(); + this.lastCounterUpdate = clock.millis(); this.cpuFrequencyInv = 1 / this.maxCapacity; } @@ -255,4 +257,11 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer this.psuEdge = null; this.invalidate(); } + + @Override + public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { + return Map.of( + FlowEdge.NodeType.CONSUMING, List.of(this.psuEdge), + FlowEdge.NodeType.SUPPLYING, List.of(this.distributorEdge)); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java index dab0c421..55a5eb42 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java @@ -29,17 +29,19 @@ import org.opendc.simulator.compute.cpu.SimCpu; import org.opendc.simulator.compute.memory.Memory; import org.opendc.simulator.compute.models.MachineModel; import org.opendc.simulator.compute.power.SimPsu; +import org.opendc.simulator.compute.workload.ChainWorkload; +import org.opendc.simulator.compute.workload.SimChainWorkload; import org.opendc.simulator.compute.workload.SimWorkload; -import org.opendc.simulator.compute.workload.Workload; +import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowDistributor; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowEdge; /** * A machine that is able to execute {@link SimWorkload} objects. */ public class SimMachine { private final MachineModel machineModel; - private final FlowGraph graph; + private final FlowEngine engine; private final InstantSource clock; @@ -62,8 +64,8 @@ public class SimMachine { return machineModel; } - public FlowGraph getGraph() { - return graph; + public FlowEngine getEngine() { + return engine; } public InstantSource getClock() { @@ -112,29 +114,30 @@ public class SimMachine { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// public SimMachine( - FlowGraph graph, + FlowEngine engine, MachineModel machineModel, FlowDistributor powerDistributor, CpuPowerModel cpuPowerModel, Consumer<Exception> completion) { - this.graph = graph; + this.engine = engine; this.machineModel = machineModel; - this.clock = graph.getEngine().getClock(); + this.clock = engine.getClock(); // Create the psu and cpu and connect them - this.psu = new SimPsu(graph); + this.psu = new SimPsu(engine); - graph.addEdge(this.psu, powerDistributor); + new FlowEdge(this.psu, powerDistributor); - this.cpu = new SimCpu(graph, this.machineModel.getCpuModel(), cpuPowerModel, 0); + this.cpu = new SimCpu(engine, this.machineModel.getCpuModel(), cpuPowerModel, 0); - graph.addEdge(this.cpu, this.psu); + new FlowEdge(this.cpu, this.psu); - this.memory = new Memory(graph, this.machineModel.getMemory()); + this.memory = new Memory(engine, this.machineModel.getMemory()); // Create a FlowDistributor and add the cpu as supplier - this.cpuDistributor = new FlowDistributor(this.graph); - graph.addEdge(this.cpuDistributor, this.cpu); + this.cpuDistributor = new FlowDistributor(engine); + + new FlowEdge(this.cpuDistributor, this.cpu); this.completion = completion; } @@ -147,13 +150,13 @@ public class SimMachine { * Close all related hardware */ public void shutdown(Exception cause) { - this.graph.removeNode(this.psu); + this.psu.closeNode(); this.psu = null; - this.graph.removeNode(this.cpu); + this.cpu.closeNode(); this.cpu = null; - this.graph.removeNode(this.cpuDistributor); + this.cpuDistributor.closeNode(); this.cpuDistributor = null; this.memory = null; @@ -181,11 +184,7 @@ public class SimMachine { * @param completion * @return */ - public VirtualMachine startWorkload(Workload workload, Consumer<Exception> completion) { - final VirtualMachine vm = new VirtualMachine(this); - - vm.startWorkload(workload, completion); - - return vm; + public SimChainWorkload startWorkload(ChainWorkload workload, Consumer<Exception> completion) { + return (SimChainWorkload) workload.startWorkload(this.cpuDistributor, this, completion); } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java deleted file mode 100644 index 1946eecb..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.machine; - -import java.util.function.Consumer; -import org.opendc.simulator.compute.cpu.SimCpu; -import org.opendc.simulator.compute.workload.SimWorkload; -import org.opendc.simulator.compute.workload.Workload; -import org.opendc.simulator.engine.graph.FlowConsumer; -import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; -import org.opendc.simulator.engine.graph.FlowNode; -import org.opendc.simulator.engine.graph.FlowSupplier; - -/* - A virtual Machine created to run a single workload -*/ -public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSupplier { - private final SimMachine machine; - - private SimWorkload activeWorkload; - - private long lastUpdate; - private final double d; - - private FlowEdge cpuEdge; // The edge to the cpu - private FlowEdge workloadEdge; // The edge to the workload - - private double cpuDemand; - private double cpuSupply; - private double cpuCapacity; - - private PerformanceCounters performanceCounters = new PerformanceCounters(); - - private Consumer<Exception> completion; - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Basic Getters and Setters - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - public PerformanceCounters getPerformanceCounters() { - return performanceCounters; - } - - public SimWorkload getActiveWorkload() { - return activeWorkload; - } - - public double getDemand() { - return cpuDemand; - } - - public void setDemand(double demand) { - this.cpuDemand = demand; - } - - public double getCpuCapacity() { - return cpuCapacity; - } - - public void setCpuCapacity(double cpuCapacity) { - this.cpuCapacity = cpuCapacity; - } - - public FlowGraph getGraph() { - return this.parentGraph; - } - - public SimCpu getCpu() { - return machine.getCpu(); - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Constructors - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - public VirtualMachine(SimMachine machine) { - super(machine.getGraph()); - this.machine = machine; - this.clock = this.machine.getClock(); - - this.parentGraph = machine.getGraph(); - this.parentGraph.addEdge(this, this.machine.getCpuDistributor()); - - this.lastUpdate = clock.millis(); - this.lastUpdate = clock.millis(); - - this.d = 1 / machine.getCpu().getFrequency(); - } - - public void shutdown() { - this.shutdown(null); - } - - public void shutdown(Exception cause) { - if (this.nodeState == NodeState.CLOSED) { - return; - } - - super.closeNode(); - - this.activeWorkload = null; - this.performanceCounters = null; - - this.completion.accept(cause); - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Workload related functionality - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - public void startWorkload(Workload workload, Consumer<Exception> completion) { - this.completion = completion; - this.activeWorkload = workload.startWorkload(this, this.clock.millis()); - } - - public void updateCounters(long now) { - long lastUpdate = this.lastUpdate; - this.lastUpdate = now; - long delta = now - lastUpdate; - - if (delta > 0) { - final double factor = this.d * delta; - - this.performanceCounters.addCpuActiveTime(Math.round(this.cpuSupply * factor)); - this.performanceCounters.setCpuIdleTime(Math.round((this.cpuCapacity - this.cpuSupply) * factor)); - this.performanceCounters.addCpuStealTime(Math.round((this.cpuDemand - this.cpuSupply) * factor)); - } - - this.performanceCounters.setCpuDemand(this.cpuDemand); - this.performanceCounters.setCpuSupply(this.cpuSupply); - this.performanceCounters.setCpuCapacity(this.cpuCapacity); - } - - @Override - public long onUpdate(long now) { - updateCounters(now); - - return Long.MAX_VALUE; - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // FlowGraph Related functionality - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - /** - * Add an edge to the workload - * TODO: maybe add a check if there is already an edge - */ - @Override - public void addConsumerEdge(FlowEdge consumerEdge) { - this.workloadEdge = consumerEdge; - } - - /** - * Add an edge to the cpuMux - * TODO: maybe add a check if there is already an edge - */ - @Override - public void addSupplierEdge(FlowEdge supplierEdge) { - this.cpuEdge = supplierEdge; - } - - /** - * Push demand to the cpuMux if the demand has changed - **/ - @Override - public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { - this.cpuEdge.pushDemand(newDemand); - } - - /** - * Push supply to the workload if the supply has changed - **/ - @Override - public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { - this.workloadEdge.pushSupply(newSupply); - } - - /** - * Handle new demand from the workload by sending it through to the cpuMux - **/ - @Override - public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) { - - updateCounters(this.clock.millis()); - this.cpuDemand = newDemand; - - pushOutgoingDemand(this.cpuEdge, newDemand); - } - - /** - * Handle a new supply pushed by the cpuMux by sending it through to the workload - **/ - @Override - public void handleIncomingSupply(FlowEdge supplierEdge, double newCpuSupply) { - - updateCounters(this.clock.millis()); - this.cpuSupply = newCpuSupply; - - pushOutgoingSupply(this.workloadEdge, newCpuSupply); - } - - @Override - public void removeConsumerEdge(FlowEdge consumerEdge) { - this.workloadEdge = null; - this.shutdown(); - } - - @Override - public double getCapacity() { - return this.cpuCapacity; - } - - @Override - public void removeSupplierEdge(FlowEdge supplierEdge) { - this.cpuEdge = null; - this.shutdown(); - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java index d4406b20..2686cfa2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.memory; import org.opendc.simulator.compute.models.MemoryUnit; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.engine.FlowEngine; /** * The [SimMemory] implementation for a machine. @@ -32,7 +32,7 @@ public final class Memory { // private final SimpleFlowSink sink; private final MemoryUnit memoryUnit; - public Memory(FlowGraph graph, MemoryUnit memoryUnit) { + public Memory(FlowEngine engine, MemoryUnit memoryUnit) { this.memoryUnit = memoryUnit; // TODO: Fix this diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java index b6246fe9..3bbdba66 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java @@ -24,7 +24,9 @@ package org.opendc.simulator.compute.power; import java.util.ArrayList; import java.util.List; -import org.opendc.simulator.engine.graph.FlowGraph; +import java.util.Map; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowEdge; import org.opendc.simulator.engine.graph.FlowNode; /** @@ -45,13 +47,13 @@ public class CarbonModel extends FlowNode { /** * Construct a CarbonModel * - * @param parentGraph The active FlowGraph which should be used to make the new FlowNode + * @param engine The {@link FlowEngine} the node belongs to * @param carbonFragments A list of Carbon Fragments defining the carbon intensity at different time frames * @param startTime The start time of the simulation. This is used to go from relative time (used by the clock) * to absolute time (used by carbon fragments). */ - public CarbonModel(FlowGraph parentGraph, List<CarbonFragment> carbonFragments, long startTime) { - super(parentGraph); + public CarbonModel(FlowEngine engine, List<CarbonFragment> carbonFragments, long startTime) { + super(engine); this.startTime = startTime; this.fragments = carbonFragments; @@ -66,6 +68,8 @@ public class CarbonModel extends FlowNode { receiver.removeCarbonModel(this); } + receivers.clear(); + this.closeNode(); } @@ -128,4 +132,17 @@ public class CarbonModel extends FlowNode { receiver.updateCarbonIntensity(this.current_fragment.getCarbonIntensity()); } + + public static <T, U> List<U> castList(List<T> list, Class<U> clazz) { + List<U> result = new ArrayList<>(); + for (T element : list) { + result.add(clazz.cast(element)); + } + return result; + } + + @Override + public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { + return Map.of(); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java index 718bc22a..34804230 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java @@ -22,9 +22,11 @@ package org.opendc.simulator.compute.power; +import java.util.List; +import java.util.Map; import org.opendc.simulator.compute.cpu.SimCpu; +import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; @@ -111,8 +113,8 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier, Carb // Constructors //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public SimPowerSource(FlowGraph graph, double max_capacity, String name, String clusterName) { - super(graph); + public SimPowerSource(FlowEngine engine, double max_capacity, String name, String clusterName) { + super(engine); this.capacity = max_capacity; @@ -208,4 +210,11 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier, Carb public void removeCarbonModel(CarbonModel carbonModel) { this.carbonModel = null; } + + @Override + public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { + List<FlowEdge> supplierEdges = this.distributorEdge != null ? List.of(this.distributorEdge) : List.of(); + + return Map.of(FlowEdge.NodeType.SUPPLYING, supplierEdges); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java index dc5129d6..87a4e791 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java @@ -22,10 +22,12 @@ package org.opendc.simulator.compute.power; +import java.util.List; +import java.util.Map; import org.opendc.simulator.compute.cpu.SimCpu; +import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowConsumer; import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; @@ -90,8 +92,8 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer // Constructors //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public SimPsu(FlowGraph graph) { - super(graph); + public SimPsu(FlowEngine engine) { + super(engine); lastUpdate = this.clock.millis(); } @@ -183,4 +185,14 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer public void removeSupplierEdge(FlowEdge supplierEdge) { this.powerSupplyEdge = null; } + + @Override + public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { + List<FlowEdge> supplyingEdges = cpuEdge != null ? List.of(cpuEdge) : List.of(); + List<FlowEdge> consumingEdges = powerSupplyEdge != null ? List.of(powerSupplyEdge) : List.of(); + + return Map.of( + FlowEdge.NodeType.SUPPLYING, supplyingEdges, + FlowEdge.NodeType.CONSUMING, consumingEdges); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/BatteryAggregator.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/BatteryAggregator.java index 14d15b17..9a05f2b3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/BatteryAggregator.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/BatteryAggregator.java @@ -24,10 +24,12 @@ package org.opendc.simulator.compute.power.batteries; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowConsumer; import org.opendc.simulator.engine.graph.FlowDistributor; import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; @@ -52,14 +54,15 @@ public class BatteryAggregator extends FlowNode implements FlowConsumer, FlowSup /** * Construct a new {@link FlowNode} instance. * - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} this node belongs to. */ - public BatteryAggregator(FlowGraph parentGraph, SimBattery battery, FlowDistributor powerSourceDistributor) { - super(parentGraph); + public BatteryAggregator(FlowEngine engine, SimBattery battery, FlowDistributor powerSourceDistributor) { + super(engine); - this.powerSourceEdge = parentGraph.addEdge(this, powerSourceDistributor); + this.powerSourceEdge = new FlowEdge(this, powerSourceDistributor); this.powerSourceEdge.setSupplierIndex(0); - this.batteryEdge = parentGraph.addEdge(this, battery); + + this.batteryEdge = new FlowEdge(this, battery); this.batteryEdge.setSupplierIndex(1); } @@ -170,4 +173,21 @@ public class BatteryAggregator extends FlowNode implements FlowConsumer, FlowSup public double getCapacity() { return 0; } + + @Override + public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { + List<FlowEdge> consumingEdges = new ArrayList<>(); + if (this.powerSourceEdge != null) { + consumingEdges.add(this.batteryEdge); + } + if (this.batteryEdge != null) { + consumingEdges.add(this.powerSourceEdge); + } + + List<FlowEdge> supplyingEdges = this.hostEdge != null ? List.of(this.hostEdge) : List.of(); + + return Map.of( + FlowEdge.NodeType.CONSUMING, consumingEdges, + FlowEdge.NodeType.SUPPLYING, supplyingEdges); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/SimBattery.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/SimBattery.java index f09cbcee..d749af72 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/SimBattery.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/SimBattery.java @@ -22,10 +22,12 @@ package org.opendc.simulator.compute.power.batteries; +import java.util.List; +import java.util.Map; import org.opendc.simulator.compute.power.batteries.policy.BatteryPolicy; +import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowConsumer; import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; @@ -120,7 +122,7 @@ public class SimBattery extends FlowNode implements FlowConsumer, FlowSupplier { /** * Construct a new {@link SimBattery} instance. * - * @param parentGraph The {@link FlowGraph} instance this battery is part of. + * @param engine The {@link FlowEngine} instance this battery is part of. * @param capacity The capacity of the battery in kWh. * @param chargingSpeed The charging speed of the battery in J. * @param initialCharge The initial charge of the battery in kWh. @@ -130,7 +132,7 @@ public class SimBattery extends FlowNode implements FlowConsumer, FlowSupplier { * @param expectedLifeTime The expected lifetime of the battery in years. */ public SimBattery( - FlowGraph parentGraph, + FlowEngine engine, double capacity, double chargingSpeed, double initialCharge, @@ -139,7 +141,7 @@ public class SimBattery extends FlowNode implements FlowConsumer, FlowSupplier { Double totalEmbodiedCarbon, Double expectedLifeTime) { - super(parentGraph); + super(engine); this.capacity = capacity * 3600000; this.chargingSpeed = chargingSpeed; @@ -319,4 +321,14 @@ public class SimBattery extends FlowNode implements FlowConsumer, FlowSupplier { public void removeConsumerEdge(FlowEdge consumerEdge) { this.close(); } + + @Override + public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { + List<FlowEdge> consumingEdges = (this.distributorEdge != null) ? List.of(this.distributorEdge) : List.of(); + List<FlowEdge> supplyingEdges = (this.aggregatorEdge != null) ? List.of(this.aggregatorEdge) : List.of(); + + return Map.of( + FlowEdge.NodeType.CONSUMING, consumingEdges, + FlowEdge.NodeType.SUPPLYING, supplyingEdges); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/BatteryPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/BatteryPolicy.java index a50f7e73..9d983a67 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/BatteryPolicy.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/BatteryPolicy.java @@ -22,13 +22,16 @@ package org.opendc.simulator.compute.power.batteries.policy; +import java.util.List; +import java.util.Map; import org.opendc.simulator.compute.power.CarbonModel; import org.opendc.simulator.compute.power.CarbonReceiver; import org.opendc.simulator.compute.power.batteries.BatteryAggregator; import org.opendc.simulator.compute.power.batteries.BatteryState; import org.opendc.simulator.compute.power.batteries.PowerSourceType; import org.opendc.simulator.compute.power.batteries.SimBattery; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowEdge; import org.opendc.simulator.engine.graph.FlowNode; /** @@ -47,10 +50,10 @@ public abstract class BatteryPolicy extends FlowNode implements CarbonReceiver { /** * Construct a new {@link FlowNode} instance. * - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} this node belongs to. */ - public BatteryPolicy(FlowGraph parentGraph, SimBattery battery, BatteryAggregator aggregator) { - super(parentGraph); + public BatteryPolicy(FlowEngine engine, SimBattery battery, BatteryAggregator aggregator) { + super(engine); this.battery = battery; this.battery.setBatteryPolicy(this); @@ -114,4 +117,9 @@ public abstract class BatteryPolicy extends FlowNode implements CarbonReceiver { public void removeCarbonModel(CarbonModel carbonModel) { this.close(); } + + @Override + public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { + return Map.of(FlowEdge.NodeType.SUPPLYING, List.of()); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/DoubleThresholdBatteryPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/DoubleThresholdBatteryPolicy.java index 3a9cb228..28302f4d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/DoubleThresholdBatteryPolicy.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/DoubleThresholdBatteryPolicy.java @@ -25,7 +25,7 @@ package org.opendc.simulator.compute.power.batteries.policy; import org.opendc.simulator.compute.power.batteries.BatteryAggregator; import org.opendc.simulator.compute.power.batteries.BatteryState; import org.opendc.simulator.compute.power.batteries.SimBattery; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.engine.FlowEngine; /** * A battery policy that uses two thresholds to determine if a better should be charging or discharging. @@ -42,19 +42,19 @@ public class DoubleThresholdBatteryPolicy extends BatteryPolicy { /** * - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} this node belongs to. * @param battery The {@link SimBattery} to control. * @param aggregator The {@link BatteryAggregator} to use. * @param lowerThreshold The lower carbon intensity threshold to trigger charging or discharging. * @param upperThreshold The upper carbon intensity threshold to trigger charging or discharging. */ public DoubleThresholdBatteryPolicy( - FlowGraph parentGraph, + FlowEngine engine, SimBattery battery, BatteryAggregator aggregator, double lowerThreshold, double upperThreshold) { - super(parentGraph, battery, aggregator); + super(engine, battery, aggregator); this.lowerThreshold = lowerThreshold; this.upperThreshold = upperThreshold; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanBatteryPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanBatteryPolicy.java index 1c127abd..63e03942 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanBatteryPolicy.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanBatteryPolicy.java @@ -26,7 +26,7 @@ import java.util.LinkedList; import org.opendc.simulator.compute.power.batteries.BatteryAggregator; import org.opendc.simulator.compute.power.batteries.BatteryState; import org.opendc.simulator.compute.power.batteries.SimBattery; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.engine.FlowEngine; /** * A battery policy that uses a running mean to determine if a battery should be charging or discharging. @@ -46,17 +46,17 @@ public class RunningMeanBatteryPolicy extends BatteryPolicy { /** * - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} this node belongs to. * @param battery The {@link SimBattery} to control. * @param aggregator The {@link BatteryAggregator} to use. */ public RunningMeanBatteryPolicy( - FlowGraph parentGraph, + FlowEngine engine, SimBattery battery, BatteryAggregator aggregator, double startingThreshold, int windowSize) { - super(parentGraph, battery, aggregator); + super(engine, battery, aggregator); this.windowSize = windowSize; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanPlusBatteryPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanPlusBatteryPolicy.java index 25b86dde..3330cd4e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanPlusBatteryPolicy.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/RunningMeanPlusBatteryPolicy.java @@ -26,7 +26,7 @@ import java.util.LinkedList; import org.opendc.simulator.compute.power.batteries.BatteryAggregator; import org.opendc.simulator.compute.power.batteries.BatteryState; import org.opendc.simulator.compute.power.batteries.SimBattery; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.engine.FlowEngine; /** * An improved version of {@link RunningMeanBatteryPolicy}. @@ -43,17 +43,17 @@ public class RunningMeanPlusBatteryPolicy extends BatteryPolicy { /** * - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} this stage belongs to. * @param battery The {@link SimBattery} to control. * @param aggregator The {@link BatteryAggregator} to use. */ public RunningMeanPlusBatteryPolicy( - FlowGraph parentGraph, + FlowEngine engine, SimBattery battery, BatteryAggregator aggregator, double startingThreshold, int windowSize) { - super(parentGraph, battery, aggregator); + super(engine, battery, aggregator); this.windowSize = windowSize; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/SingleThresholdBatteryPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/SingleThresholdBatteryPolicy.java index 26d85958..22e6ae2f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/SingleThresholdBatteryPolicy.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/batteries/policy/SingleThresholdBatteryPolicy.java @@ -25,7 +25,7 @@ package org.opendc.simulator.compute.power.batteries.policy; import org.opendc.simulator.compute.power.batteries.BatteryAggregator; import org.opendc.simulator.compute.power.batteries.BatteryState; import org.opendc.simulator.compute.power.batteries.SimBattery; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.engine.FlowEngine; /** * A battery policy that uses a single threshold to determine if a better should be charging or discharging. @@ -38,14 +38,14 @@ public class SingleThresholdBatteryPolicy extends BatteryPolicy { private final double carbonThreshold; /** - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} this node belongs to. * @param battery The {@link SimBattery} to control. * @param aggregator The {@link BatteryAggregator} to use. * @param carbonThreshold The carbon intensity threshold to trigger charging or discharging. */ public SingleThresholdBatteryPolicy( - FlowGraph parentGraph, SimBattery battery, BatteryAggregator aggregator, double carbonThreshold) { - super(parentGraph, battery, aggregator); + FlowEngine engine, SimBattery battery, BatteryAggregator aggregator, double carbonThreshold) { + super(engine, battery, aggregator); this.carbonThreshold = carbonThreshold; } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java index ecd4c47f..76a715f7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java @@ -23,6 +23,8 @@ package org.opendc.simulator.compute.workload; import java.util.ArrayList; +import java.util.function.Consumer; +import org.opendc.simulator.compute.machine.SimMachine; import org.opendc.simulator.engine.graph.FlowSupplier; public class ChainWorkload implements Workload { @@ -66,7 +68,12 @@ public class ChainWorkload implements Workload { } @Override - public SimWorkload startWorkload(FlowSupplier supplier, long now) { - return new SimChainWorkload(supplier, this, now); + public SimWorkload startWorkload(FlowSupplier supplier) { + return new SimChainWorkload(supplier, this); + } + + @Override + public SimWorkload startWorkload(FlowSupplier supplier, SimMachine machine, Consumer<Exception> completion) { + return new SimChainWorkload(supplier, this, machine, completion); } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java index 6cc67e3f..d34da203 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java @@ -27,9 +27,10 @@ package org.opendc.simulator.compute.workload; // TODO: Move this to a separate file //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -import java.time.InstantSource; +import java.util.List; +import java.util.Map; import org.jetbrains.annotations.NotNull; -import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowEdge; import org.opendc.simulator.engine.graph.FlowNode; public class CheckpointModel extends FlowNode { @@ -37,23 +38,18 @@ public class CheckpointModel extends FlowNode { private long checkpointInterval; private final long checkpointDuration; private double checkpointIntervalScaling; - private FlowGraph graph; private long startOfInterval; public CheckpointModel(@NotNull SimWorkload simWorkload) { - super(simWorkload.getGraph()); + super(simWorkload.getEngine()); this.checkpointInterval = simWorkload.getCheckpointInterval(); this.checkpointDuration = simWorkload.getCheckpointDuration(); this.checkpointIntervalScaling = simWorkload.getCheckpointIntervalScaling(); this.simWorkload = simWorkload; - this.graph = simWorkload.getGraph(); - - InstantSource clock = graph.getEngine().getClock(); - - this.startOfInterval = clock.millis(); + this.startOfInterval = this.clock.millis(); } @Override @@ -89,6 +85,10 @@ public class CheckpointModel extends FlowNode { this.closeNode(); this.simWorkload = null; - this.graph = null; + } + + @Override + public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { + return Map.of(); } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java index da6b8334..faab5c56 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -23,6 +23,11 @@ package org.opendc.simulator.compute.workload; import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import org.opendc.simulator.compute.machine.PerformanceCounters; +import org.opendc.simulator.compute.machine.SimMachine; import org.opendc.simulator.engine.graph.FlowEdge; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; @@ -30,13 +35,15 @@ import org.opendc.simulator.engine.graph.FlowSupplier; /** * A {@link SimChainWorkload} that composes multiple {@link SimWorkload}s. */ -final class SimChainWorkload extends SimWorkload implements FlowSupplier { +public final class SimChainWorkload extends SimWorkload implements FlowSupplier { private final LinkedList<Workload> workloads; private int workloadIndex; private SimWorkload activeWorkload; - private double demand = 0.0f; - private double supply = 0.0f; + private double cpuDemand = 0.0f; + private double cpuSupply = 0.0f; + private double cpuCapacity = 0.0f; + private double d = 0.0f; private FlowEdge workloadEdge; private FlowEdge machineEdge; @@ -50,6 +57,10 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { private ChainWorkload snapshot; + private long lastUpdate; + private PerformanceCounters performanceCounters = new PerformanceCounters(); + private Consumer<Exception> completion; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Basic Getters and Setters //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -79,24 +90,28 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { return checkpointIntervalScaling; } + public PerformanceCounters getPerformanceCounters() { + return performanceCounters; + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Constructors //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - SimChainWorkload(FlowSupplier supplier, ChainWorkload workload, long now) { - super(((FlowNode) supplier).getGraph()); + SimChainWorkload(FlowSupplier supplier, ChainWorkload workload) { + super(((FlowNode) supplier).getEngine()); this.snapshot = workload; - this.parentGraph = ((FlowNode) supplier).getGraph(); - this.parentGraph.addEdge(this, supplier); + new FlowEdge(this, supplier); - this.clock = this.parentGraph.getEngine().getClock(); this.workloads = new LinkedList<>(workload.getWorkloads()); this.checkpointInterval = workload.getCheckpointInterval(); this.checkpointDuration = workload.getCheckpointDuration(); this.checkpointIntervalScaling = workload.getCheckpointIntervalScaling(); + this.lastUpdate = clock.millis(); + if (checkpointInterval > 0) { this.createCheckpointModel(); } @@ -106,6 +121,15 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { this.onStart(); } + SimChainWorkload( + FlowSupplier supplier, ChainWorkload workload, SimMachine machine, Consumer<Exception> completion) { + this(supplier, workload); + + this.capacity = machine.getCpu().getFrequency(); + this.d = 1 / machine.getCpu().getFrequency(); + this.completion = completion; + } + public Workload getNextWorkload() { this.workloadIndex++; return workloads.pop(); @@ -122,7 +146,25 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { this.checkpointModel.start(); } - this.activeWorkload = this.getNextWorkload().startWorkload(this, this.clock.millis()); + this.activeWorkload = this.getNextWorkload().startWorkload(this); + } + + public void updateCounters(long now) { + long lastUpdate = this.lastUpdate; + this.lastUpdate = now; + long delta = now - lastUpdate; + + if (delta > 0) { + final double factor = this.d * delta; + + this.performanceCounters.addCpuActiveTime(Math.round(this.cpuSupply * factor)); + this.performanceCounters.setCpuIdleTime(Math.round((this.cpuCapacity - this.cpuSupply) * factor)); + this.performanceCounters.addCpuStealTime(Math.round((this.cpuDemand - this.cpuSupply) * factor)); + } + + this.performanceCounters.setCpuDemand(this.cpuDemand); + this.performanceCounters.setCpuSupply(this.cpuSupply); + this.performanceCounters.setCpuCapacity(this.cpuCapacity); } @Override @@ -132,6 +174,16 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { @Override public void stopWorkload() { + this.stopWorkload(null); + } + + private Exception stopWorkloadCause = null; + + public void stopWorkload(Exception cause) { + if (cause != null) { + this.stopWorkloadCause = cause; + } + if (this.checkpointModel != null) { this.checkpointModel.close(); this.checkpointModel = null; @@ -143,6 +195,11 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { } this.closeNode(); + + if (this.completion != null) { + this.completion.accept(stopWorkloadCause); + this.completion = null; + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -197,7 +254,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { @Override public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { - this.demand = newDemand; + this.cpuDemand = newDemand; this.machineEdge.pushDemand(newDemand); } @@ -210,7 +267,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { @Override public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { - this.supply = newSupply; + this.cpuSupply = newSupply; this.workloadEdge.pushSupply(newSupply); } @@ -222,6 +279,8 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { */ @Override public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) { + updateCounters(this.clock.millis()); + this.pushOutgoingDemand(this.machineEdge, newDemand); } @@ -233,6 +292,8 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { */ @Override public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { + updateCounters(this.clock.millis()); + this.pushOutgoingSupply(this.machineEdge, newSupply); } @@ -255,7 +316,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { // Start next workload if (!this.workloads.isEmpty()) { - this.activeWorkload = getNextWorkload().startWorkload(this, this.clock.millis()); + this.activeWorkload = getNextWorkload().startWorkload(this); return; } @@ -276,4 +337,14 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { this.stopWorkload(); } + + @Override + public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { + List<FlowEdge> consumerEdges = (this.machineEdge != null) ? List.of(this.machineEdge) : List.of(); + List<FlowEdge> supplierEdges = (this.workloadEdge != null) ? List.of(this.workloadEdge) : List.of(); + + return Map.of( + FlowEdge.NodeType.CONSUMING, consumerEdges, + FlowEdge.NodeType.SUPPLYING, supplierEdges); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java index ebfcc552..d27d90af 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java @@ -22,8 +22,8 @@ package org.opendc.simulator.compute.workload; +import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowConsumer; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; /** @@ -37,10 +37,10 @@ public abstract class SimWorkload extends FlowNode implements FlowConsumer { /** * Construct a new {@link FlowNode} instance. * - * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param engine The {@link FlowEngine} this stage belongs to. */ - public SimWorkload(FlowGraph parentGraph) { - super(parentGraph); + public SimWorkload(FlowEngine engine) { + super(engine); } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java index d85669bb..1d069bae 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.workload; +import java.util.function.Consumer; +import org.opendc.simulator.compute.machine.SimMachine; import org.opendc.simulator.engine.graph.FlowSupplier; public interface Workload { @@ -32,5 +34,7 @@ public interface Workload { double getCheckpointIntervalScaling(); - SimWorkload startWorkload(FlowSupplier supplier, long now); + SimWorkload startWorkload(FlowSupplier supplier); + + SimWorkload startWorkload(FlowSupplier supplier, SimMachine machine, Consumer<Exception> completion); } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java index 46354d4c..9811f72d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java @@ -23,12 +23,13 @@ package org.opendc.simulator.compute.workload.trace; import java.util.LinkedList; +import java.util.List; +import java.util.Map; import org.opendc.simulator.compute.workload.SimWorkload; import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling; import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy; import org.opendc.simulator.engine.graph.FlowConsumer; import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; @@ -46,9 +47,9 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { private double newCpuFreqSupplied = 0.0; // The Cpu speed supplied private double remainingWork = 0.0; // The duration of the fragment at the demanded speed - private long checkpointDuration; + private final long checkpointDuration; - private TraceWorkload snapshot; + private final TraceWorkload snapshot; private ScalingPolicy scalingPolicy = new NoDelayScaling(); @@ -83,8 +84,8 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { // Constructors //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public SimTraceWorkload(FlowSupplier supplier, TraceWorkload workload, long now) { - super(((FlowNode) supplier).getGraph()); + public SimTraceWorkload(FlowSupplier supplier, TraceWorkload workload) { + super(((FlowNode) supplier).getEngine()); this.snapshot = workload; this.checkpointDuration = workload.getCheckpointDuration(); @@ -92,8 +93,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.remainingFragments = new LinkedList<>(workload.getFragments()); this.fragmentIndex = 0; - final FlowGraph graph = ((FlowNode) supplier).getGraph(); - graph.addEdge(this, supplier); + new FlowEdge(this, supplier); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -289,4 +289,9 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.stopWorkload(); } + + @Override + public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { + return Map.of(FlowEdge.NodeType.CONSUMING, (this.machineEdge != null) ? List.of(this.machineEdge) : List.of()); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java index 47292a7b..80687b88 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java @@ -25,6 +25,8 @@ package org.opendc.simulator.compute.workload.trace; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.function.Consumer; +import org.opendc.simulator.compute.machine.SimMachine; import org.opendc.simulator.compute.workload.SimWorkload; import org.opendc.simulator.compute.workload.Workload; import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling; @@ -111,8 +113,13 @@ public class TraceWorkload implements Workload { } @Override - public SimWorkload startWorkload(FlowSupplier supplier, long now) { - return new SimTraceWorkload(supplier, this, now); + public SimWorkload startWorkload(FlowSupplier supplier) { + return new SimTraceWorkload(supplier, this); + } + + @Override + public SimWorkload startWorkload(FlowSupplier supplier, SimMachine machine, Consumer<Exception> completion) { + return this.startWorkload(supplier); } public static Builder builder() { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt index 49baaf48..bbd6feaa 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt @@ -25,8 +25,6 @@ package org.opendc.simulator.compute import kotlinx.coroutines.suspendCancellableCoroutine import org.opendc.simulator.compute.machine.SimMachine import org.opendc.simulator.compute.workload.trace.TraceWorkload -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException /** * Run the specified [SimWorkloadNew] on this machine and suspend execution util [workload] has finished. @@ -43,8 +41,8 @@ public suspend fun SimMachine.runWorkload( return suspendCancellableCoroutine { cont -> cont.invokeOnCancellation { this@runWorkload.shutdown() } - startWorkload(workload) { cause -> - if (cause != null) cont.resumeWithException(cause) else cont.resume(Unit) - } +// startWorkload(workload) { cause -> +// if (cause != null) cont.resumeWithException(cause) else cont.resume(Unit) +// } } } |
