From be9698483f8e7891b5c2d562eaeac9dd3edbf9d8 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Fri, 24 Jan 2025 13:54:59 +0100 Subject: Added Fragment scaling (#296) * Added maxCpuDemand to TraceWorkload, don't know if this will be needed so might remove later. Updated SimTraceWorkload to properly handle creating checkpoints Fixed a bug with the updatedConsumers in the FlowDistributor Implemented a first version of scaling the runtime of fragments. * small update * updated tests to reflect the changes in the checkpointing model * Updated the checkpointing tests to reflect the changes made * updated wrapper-validation-action * Applied spotless --- .../compute/workload/SimTraceWorkload.java | 262 ------------------- .../simulator/compute/workload/SimWorkload.java | 8 +- .../simulator/compute/workload/TraceFragment.java | 30 --- .../simulator/compute/workload/TraceWorkload.java | 158 ------------ .../compute/workload/trace/SimTraceWorkload.java | 286 +++++++++++++++++++++ .../compute/workload/trace/TraceFragment.java | 30 +++ .../compute/workload/trace/TraceWorkload.java | 205 +++++++++++++++ .../workload/trace/scaling/NoDelayScaling.java | 47 ++++ .../workload/trace/scaling/PerfectScaling.java | 47 ++++ .../workload/trace/scaling/ScalingPolicy.java | 59 +++++ .../org/opendc/simulator/compute/Coroutines.kt | 2 +- .../opendc/simulator/engine/engine/FlowEngine.java | 14 +- .../simulator/engine/graph/FlowDistributor.java | 35 ++- .../opendc/simulator/engine/graph/FlowGraph.java | 4 +- 14 files changed, 710 insertions(+), 477 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java deleted file mode 100644 index 9b12b1e3..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.workload; - -import java.util.LinkedList; -import org.opendc.simulator.engine.graph.FlowConsumer; -import org.opendc.simulator.engine.graph.FlowEdge; -import org.opendc.simulator.engine.graph.FlowGraph; -import org.opendc.simulator.engine.graph.FlowNode; -import org.opendc.simulator.engine.graph.FlowSupplier; - -public class SimTraceWorkload extends SimWorkload implements FlowConsumer { - private LinkedList remainingFragments; - private int fragmentIndex; - - private TraceFragment currentFragment; - private long startOfFragment; - - private FlowEdge machineEdge; - private double currentDemand; - private double currentSupply; - - private long checkpointDuration; - - private TraceWorkload snapshot; - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Basic Getters and Setters - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - public long getPassedTime(long now) { - return now - this.startOfFragment; - } - - public TraceWorkload getSnapshot() { - return snapshot; - } - - @Override - long getCheckpointInterval() { - return 0; - } - - @Override - long getCheckpointDuration() { - return 0; - } - - @Override - double getCheckpointIntervalScaling() { - return 0; - } - - public TraceFragment getNextFragment() { - this.currentFragment = this.remainingFragments.pop(); - this.fragmentIndex++; - - return this.currentFragment; - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Constructors - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - public SimTraceWorkload(FlowSupplier supplier, TraceWorkload workload, long now) { - super(((FlowNode) supplier).getGraph()); - - this.snapshot = workload; - this.checkpointDuration = workload.getCheckpointDuration(); - this.remainingFragments = new LinkedList<>(workload.getFragments()); - this.fragmentIndex = 0; - - final FlowGraph graph = ((FlowNode) supplier).getGraph(); - graph.addEdge(this, supplier); - - this.currentFragment = this.getNextFragment(); - pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage()); - this.startOfFragment = now; - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Fragment related functionality - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - @Override - public long onUpdate(long now) { - long passedTime = getPassedTime(now); - long duration = this.currentFragment.duration(); - - // The current Fragment has not yet been finished, continue - if (passedTime < duration) { - return now + (duration - passedTime); - } - - // Loop through fragments until the passed time is filled. - // We need a while loop to account for skipping of fragments. - while (passedTime >= duration) { - if (this.remainingFragments.isEmpty()) { - this.stopWorkload(); - return Long.MAX_VALUE; - } - - passedTime = passedTime - duration; - - // get next Fragment - currentFragment = this.getNextFragment(); - duration = currentFragment.duration(); - } - - // start new fragment - this.startOfFragment = now - passedTime; - - // Change the cpu Usage to the new Fragment - pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage()); - - // Return the time when the current fragment will complete - return this.startOfFragment + duration; - } - - @Override - public void stopWorkload() { - if (this.machineEdge == null) { - return; - } - - this.closeNode(); - - this.machineEdge = null; - this.remainingFragments = null; - this.currentFragment = null; - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Checkpoint related functionality - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - /** - * SimTraceWorkload does not make a checkpoint, checkpointing is handled by SimChainWorkload - * TODO: Maybe add checkpoint models for SimTraceWorkload - */ - @Override - void createCheckpointModel() {} - - /** - * Create a new snapshot based on the current status of the workload. - * @param now Moment on which the snapshot is made in milliseconds - */ - public void makeSnapshot(long now) { - - // Check if fragments is empty - - // Get remaining time of current fragment - long passedTime = getPassedTime(now); - long remainingTime = currentFragment.duration() - passedTime; - - // If this is the end of the Task, don't make a snapshot - if (remainingTime <= 0 && remainingFragments.isEmpty()) { - return; - } - - // Create a new fragment based on the current fragment and remaining duration - TraceFragment newFragment = - new TraceFragment(remainingTime, currentFragment.cpuUsage(), currentFragment.coreCount()); - - // Alter the snapshot by removing finished fragments - this.snapshot.removeFragments(this.fragmentIndex); - this.snapshot.addFirst(newFragment); - - this.remainingFragments.addFirst(newFragment); - - // Create and add a fragment for processing the snapshot process - // TODO: improve the implementation of cpuUsage and coreCount - TraceFragment snapshotFragment = new TraceFragment(this.checkpointDuration, 123456, 1); - this.remainingFragments.addFirst(snapshotFragment); - - this.fragmentIndex = -1; - this.currentFragment = getNextFragment(); - pushOutgoingDemand(this.machineEdge, this.currentFragment.cpuUsage()); - this.startOfFragment = now; - - this.invalidate(); - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // FlowGraph Related functionality - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - /** - * Handle updates in supply from the Virtual Machine - * - * @param supplierEdge edge to the VM on which this is running - * @param newSupply The new demand that needs to be sent to the VM - */ - @Override - public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { - if (newSupply == this.currentSupply) { - return; - } - - this.currentSupply = newSupply; - } - - /** - * Push a new demand to the Virtual Machine - * - * @param supplierEdge edge to the VM on which this is running - * @param newDemand The new demand that needs to be sent to the VM - */ - @Override - public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { - if (newDemand == this.currentDemand) { - return; - } - - this.currentDemand = newDemand; - this.machineEdge.pushDemand(newDemand); - } - - /** - * Add the connection to the Virtual Machine - * - * @param supplierEdge edge to the VM on which this is running - */ - @Override - public void addSupplierEdge(FlowEdge supplierEdge) { - this.machineEdge = supplierEdge; - } - - /** - * Handle the removal of the connection to the Virtual Machine - * When the connection to the Virtual Machine is removed, the SimTraceWorkload is removed - * - * @param supplierEdge edge to the VM on which this is running - */ - @Override - public void removeSupplierEdge(FlowEdge supplierEdge) { - if (this.machineEdge == null) { - return; - } - this.stopWorkload(); - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java index 2919fc3a..ebfcc552 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java @@ -55,11 +55,11 @@ public abstract class SimWorkload extends FlowNode implements FlowConsumer { public abstract Workload getSnapshot(); - abstract void createCheckpointModel(); + public abstract void createCheckpointModel(); - abstract long getCheckpointInterval(); + public abstract long getCheckpointInterval(); - abstract long getCheckpointDuration(); + public abstract long getCheckpointDuration(); - abstract double getCheckpointIntervalScaling(); + public abstract double getCheckpointIntervalScaling(); } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java deleted file mode 100644 index 550c2135..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.workload; - -public record TraceFragment(long duration, double cpuUsage, int coreCount) { - - public TraceFragment(long start, long duration, double cpuUsage, int coreCount) { - this(duration, cpuUsage, coreCount); - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java deleted file mode 100644 index 7f82ab71..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.workload; - -import java.util.ArrayList; -import java.util.List; -import org.opendc.simulator.engine.graph.FlowSupplier; - -public class TraceWorkload implements Workload { - private ArrayList fragments; - private final long checkpointInterval; - private final long checkpointDuration; - private final double checkpointIntervalScaling; - - public TraceWorkload( - ArrayList fragments, - long checkpointInterval, - long checkpointDuration, - double checkpointIntervalScaling) { - this.fragments = fragments; - this.checkpointInterval = checkpointInterval; - this.checkpointDuration = checkpointDuration; - this.checkpointIntervalScaling = checkpointIntervalScaling; - } - - public TraceWorkload(ArrayList fragments) { - this(fragments, 0L, 0L, 1.0); - } - - public ArrayList getFragments() { - return fragments; - } - - @Override - public long getCheckpointInterval() { - return checkpointInterval; - } - - @Override - public long getCheckpointDuration() { - return checkpointDuration; - } - - @Override - public double getCheckpointIntervalScaling() { - return checkpointIntervalScaling; - } - - public void removeFragments(int numberOfFragments) { - if (numberOfFragments <= 0) { - return; - } - this.fragments.subList(0, numberOfFragments).clear(); - } - - public void addFirst(TraceFragment fragment) { - this.fragments.add(0, fragment); - } - - @Override - public SimWorkload startWorkload(FlowSupplier supplier, long now) { - return new SimTraceWorkload(supplier, this, now); - } - - public static Builder builder() { - return builder(0L, 0L, 0.0); - } - - public static Builder builder(long checkpointInterval, long checkpointDuration, double checkpointIntervalScaling) { - return new Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling); - } - - /** - * Construct a {@link TraceWorkload} from the specified fragments. - * - * @param fragments The array of fragments to construct the trace from. - */ - public static TraceWorkload ofFragments(TraceFragment... fragments) { - final Builder builder = builder(); - - for (TraceFragment fragment : fragments) { - builder.add(fragment.duration(), fragment.cpuUsage(), fragment.coreCount()); - } - - return builder.build(); - } - - /** - * Construct a {@link TraceWorkload} from the specified fragments. - * - * @param fragments The fragments to construct the trace from. - */ - public static TraceWorkload ofFragments(List fragments) { - final Builder builder = builder(); - - for (TraceFragment fragment : fragments) { - builder.add(fragment.duration(), fragment.cpuUsage(), fragment.coreCount()); - } - - return builder.build(); - } - - public static final class Builder { - private final ArrayList fragments; - private final long checkpointInterval; - private final long checkpointDuration; - private final double checkpointIntervalScaling; - - /** - * Construct a new {@link Builder} instance. - */ - private Builder(long checkpointInterval, long checkpointDuration, double checkpointIntervalScaling) { - this.fragments = new ArrayList<>(); - this.checkpointInterval = checkpointInterval; - this.checkpointDuration = checkpointDuration; - this.checkpointIntervalScaling = checkpointIntervalScaling; - } - - /** - * Add a fragment to the trace. - * - * @param duration The timestamp at which the fragment ends (in epoch millis). - * @param usage The CPU usage at this fragment. - * @param cores The number of cores used during this fragment. - */ - public void add(long duration, double usage, int cores) { - fragments.add(fragments.size(), new TraceFragment(duration, usage, cores)); - } - - /** - * Build the {@link TraceWorkload} instance. - */ - public TraceWorkload build() { - return new TraceWorkload( - this.fragments, this.checkpointInterval, this.checkpointDuration, this.checkpointIntervalScaling); - } - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java new file mode 100644 index 00000000..93733268 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java @@ -0,0 +1,286 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload.trace; + +import java.util.LinkedList; +import org.opendc.simulator.compute.workload.SimWorkload; +import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling; +import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy; +import org.opendc.simulator.engine.graph.FlowConsumer; +import org.opendc.simulator.engine.graph.FlowEdge; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; +import org.opendc.simulator.engine.graph.FlowSupplier; + +public class SimTraceWorkload extends SimWorkload implements FlowConsumer { + private LinkedList remainingFragments; + private int fragmentIndex; + + private TraceFragment currentFragment; + private long startOfFragment; + + private FlowEdge machineEdge; + + private double cpuFreqDemand = 0.0; // The Cpu demanded by fragment + private double cpuFreqSupplied = 0.0; // The Cpu speed supplied + private double newCpuFreqSupplied = 0.0; // The Cpu speed supplied + private double remainingWork = 0.0; // The duration of the fragment at the demanded speed + + private long checkpointDuration; + + private TraceWorkload snapshot; + + private ScalingPolicy scalingPolicy = new NoDelayScaling(); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Basic Getters and Setters + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + public long getPassedTime(long now) { + return now - this.startOfFragment; + } + + public TraceWorkload getSnapshot() { + return snapshot; + } + + @Override + public long getCheckpointInterval() { + return 0; + } + + @Override + public long getCheckpointDuration() { + return 0; + } + + @Override + public double getCheckpointIntervalScaling() { + return 0; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Constructors + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + public SimTraceWorkload(FlowSupplier supplier, TraceWorkload workload, long now) { + super(((FlowNode) supplier).getGraph()); + + this.snapshot = workload; + this.checkpointDuration = workload.getCheckpointDuration(); + this.scalingPolicy = workload.getScalingPolicy(); + this.remainingFragments = new LinkedList<>(workload.getFragments()); + this.fragmentIndex = 0; + + final FlowGraph graph = ((FlowNode) supplier).getGraph(); + graph.addEdge(this, supplier); + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Fragment related functionality + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public long onUpdate(long now) { + long passedTime = getPassedTime(now); + this.startOfFragment = now; + + // The amount of work done since last update + double finishedWork = this.scalingPolicy.getFinishedWork(this.cpuFreqDemand, this.cpuFreqSupplied, passedTime); + + this.remainingWork -= finishedWork; + + // If this.remainingWork <= 0, the fragment has been completed + if (this.remainingWork <= 0) { + this.startNextFragment(); + + this.invalidate(); + return Long.MAX_VALUE; + } + + this.cpuFreqSupplied = this.newCpuFreqSupplied; + + // The amount of time required to finish the fragment at this speed + long remainingDuration = this.scalingPolicy.getRemainingDuration( + this.cpuFreqDemand, this.newCpuFreqSupplied, this.remainingWork); + + return now + remainingDuration; + } + + public TraceFragment getNextFragment() { + if (this.remainingFragments.isEmpty()) { + return null; + } + this.currentFragment = this.remainingFragments.pop(); + this.fragmentIndex++; + + return this.currentFragment; + } + + private void startNextFragment() { + + TraceFragment nextFragment = this.getNextFragment(); + if (nextFragment == null) { + this.stopWorkload(); + return; + } + double demand = nextFragment.cpuUsage(); + this.remainingWork = this.scalingPolicy.getRemainingWork(demand, nextFragment.duration()); + this.pushOutgoingDemand(this.machineEdge, demand); + } + + @Override + public void stopWorkload() { + if (this.machineEdge == null) { + return; + } + + this.closeNode(); + + this.machineEdge = null; + this.remainingFragments = null; + this.currentFragment = null; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Checkpoint related functionality + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * SimTraceWorkload does not make a checkpoint, checkpointing is handled by SimChainWorkload + * TODO: Maybe add checkpoint models for SimTraceWorkload + */ + @Override + public void createCheckpointModel() {} + + /** + * Create a new snapshot based on the current status of the workload. + * @param now Moment on which the snapshot is made in milliseconds + */ + public void makeSnapshot(long now) { + + // Check if fragments is empty + + // Get remaining time of current fragment + long passedTime = getPassedTime(now); + + // The amount of work done since last update + double finishedWork = this.scalingPolicy.getFinishedWork(this.cpuFreqDemand, this.cpuFreqSupplied, passedTime); + + this.remainingWork -= finishedWork; + + // The amount of time required to finish the fragment at this speed + long remainingTime = + this.scalingPolicy.getRemainingDuration(this.cpuFreqDemand, this.cpuFreqDemand, this.remainingWork); + + // If this is the end of the Task, don't make a snapshot + if (remainingTime <= 0 && remainingFragments.isEmpty()) { + return; + } + + // Create a new fragment based on the current fragment and remaining duration + TraceFragment newFragment = + new TraceFragment(remainingTime, currentFragment.cpuUsage(), currentFragment.coreCount()); + + // Alter the snapshot by removing finished fragments + this.snapshot.removeFragments(this.fragmentIndex); + this.snapshot.addFirst(newFragment); + + this.remainingFragments.addFirst(newFragment); + + // Create and add a fragment for processing the snapshot process + TraceFragment snapshotFragment = new TraceFragment( + this.checkpointDuration, this.snapshot.getMaxCpuDemand(), this.snapshot.getMaxCoreCount()); + this.remainingFragments.addFirst(snapshotFragment); + + this.fragmentIndex = -1; + startNextFragment(); + + this.startOfFragment = now; + + this.invalidate(); + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // FlowGraph Related functionality + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Handle updates in supply from the Virtual Machine + * + * @param supplierEdge edge to the VM on which this is running + * @param newSupply The new demand that needs to be sent to the VM + */ + @Override + public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { + if (newSupply == this.cpuFreqSupplied) { + return; + } + + this.cpuFreqSupplied = this.newCpuFreqSupplied; + this.newCpuFreqSupplied = newSupply; + + this.invalidate(); + } + + /** + * Push a new demand to the Virtual Machine + * + * @param supplierEdge edge to the VM on which this is running + * @param newDemand The new demand that needs to be sent to the VM + */ + @Override + public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { + if (newDemand == this.cpuFreqDemand) { + return; + } + + this.cpuFreqDemand = newDemand; + this.machineEdge.pushDemand(newDemand); + } + + /** + * Add the connection to the Virtual Machine + * + * @param supplierEdge edge to the VM on which this is running + */ + @Override + public void addSupplierEdge(FlowEdge supplierEdge) { + this.machineEdge = supplierEdge; + } + + /** + * Handle the removal of the connection to the Virtual Machine + * When the connection to the Virtual Machine is removed, the SimTraceWorkload is removed + * + * @param supplierEdge edge to the VM on which this is running + */ + @Override + public void removeSupplierEdge(FlowEdge supplierEdge) { + if (this.machineEdge == null) { + return; + } + + this.stopWorkload(); + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java new file mode 100644 index 00000000..a09206a1 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload.trace; + +public record TraceFragment(long duration, double cpuUsage, int coreCount) { + + public TraceFragment(long start, long duration, double cpuUsage, int coreCount) { + this(duration, cpuUsage, coreCount); + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java new file mode 100644 index 00000000..47292a7b --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload.trace; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import org.opendc.simulator.compute.workload.SimWorkload; +import org.opendc.simulator.compute.workload.Workload; +import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling; +import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy; +import org.opendc.simulator.engine.graph.FlowSupplier; + +public class TraceWorkload implements Workload { + private ArrayList fragments; + private final long checkpointInterval; + private final long checkpointDuration; + private final double checkpointIntervalScaling; + private final double maxCpuDemand; + private final int maxCoreCount; + + public ScalingPolicy getScalingPolicy() { + return scalingPolicy; + } + + private final ScalingPolicy scalingPolicy; + + public TraceWorkload( + ArrayList fragments, + long checkpointInterval, + long checkpointDuration, + double checkpointIntervalScaling, + ScalingPolicy scalingPolicy) { + this.fragments = fragments; + this.checkpointInterval = checkpointInterval; + this.checkpointDuration = checkpointDuration; + this.checkpointIntervalScaling = checkpointIntervalScaling; + this.scalingPolicy = scalingPolicy; + + // TODO: remove if we decide not to use it. + this.maxCpuDemand = fragments.stream() + .max(Comparator.comparing(TraceFragment::cpuUsage)) + .get() + .cpuUsage(); + this.maxCoreCount = fragments.stream() + .max(Comparator.comparing(TraceFragment::coreCount)) + .get() + .coreCount(); + } + + public TraceWorkload(ArrayList fragments) { + this(fragments, 0L, 0L, 1.0, new NoDelayScaling()); + } + + public ArrayList getFragments() { + return fragments; + } + + @Override + public long getCheckpointInterval() { + return checkpointInterval; + } + + @Override + public long getCheckpointDuration() { + return checkpointDuration; + } + + @Override + public double getCheckpointIntervalScaling() { + return checkpointIntervalScaling; + } + + public int getMaxCoreCount() { + return maxCoreCount; + } + + public double getMaxCpuDemand() { + return maxCpuDemand; + } + + public void removeFragments(int numberOfFragments) { + if (numberOfFragments <= 0) { + return; + } + this.fragments.subList(0, numberOfFragments).clear(); + } + + public void addFirst(TraceFragment fragment) { + this.fragments.add(0, fragment); + } + + @Override + public SimWorkload startWorkload(FlowSupplier supplier, long now) { + return new SimTraceWorkload(supplier, this, now); + } + + public static Builder builder() { + return builder(0L, 0L, 0.0, new NoDelayScaling()); + } + + public static Builder builder( + long checkpointInterval, + long checkpointDuration, + double checkpointIntervalScaling, + ScalingPolicy scalingPolicy) { + return new Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy); + } + + /** + * Construct a {@link TraceWorkload} from the specified fragments. + * + * @param fragments The array of fragments to construct the trace from. + */ + public static TraceWorkload ofFragments(TraceFragment... fragments) { + final Builder builder = builder(); + + for (TraceFragment fragment : fragments) { + builder.add(fragment.duration(), fragment.cpuUsage(), fragment.coreCount()); + } + + return builder.build(); + } + + /** + * Construct a {@link TraceWorkload} from the specified fragments. + * + * @param fragments The fragments to construct the trace from. + */ + public static TraceWorkload ofFragments(List fragments) { + final Builder builder = builder(); + + for (TraceFragment fragment : fragments) { + builder.add(fragment.duration(), fragment.cpuUsage(), fragment.coreCount()); + } + + return builder.build(); + } + + public static final class Builder { + private final ArrayList fragments; + private final long checkpointInterval; + private final long checkpointDuration; + private final double checkpointIntervalScaling; + private final ScalingPolicy scalingPolicy; + + /** + * Construct a new {@link Builder} instance. + */ + private Builder( + long checkpointInterval, + long checkpointDuration, + double checkpointIntervalScaling, + ScalingPolicy scalingPolicy) { + this.fragments = new ArrayList<>(); + this.checkpointInterval = checkpointInterval; + this.checkpointDuration = checkpointDuration; + this.checkpointIntervalScaling = checkpointIntervalScaling; + this.scalingPolicy = scalingPolicy; + } + + /** + * Add a fragment to the trace. + * + * @param duration The timestamp at which the fragment ends (in epoch millis). + * @param usage The CPU usage at this fragment. + * @param cores The number of cores used during this fragment. + */ + public void add(long duration, double usage, int cores) { + fragments.add(fragments.size(), new TraceFragment(duration, usage, cores)); + } + + /** + * Build the {@link TraceWorkload} instance. + */ + public TraceWorkload build() { + return new TraceWorkload( + this.fragments, + this.checkpointInterval, + this.checkpointDuration, + this.checkpointIntervalScaling, + this.scalingPolicy); + } + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java new file mode 100644 index 00000000..4230bb55 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload.trace.scaling; + +/** + * The NoDelay scaling policy states that there will be no delay + * when less CPU can be provided than needed. + * + * This could be used in situations where the data is streamed. + * This will also result in the same behaviour as older OpenDC. + */ +public class NoDelayScaling implements ScalingPolicy { + @Override + public double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime) { + return cpuFreqDemand * passedTime; + } + + @Override + public long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork) { + return (long) (remainingWork / cpuFreqDemand); + } + + @Override + public double getRemainingWork(double cpuFreqDemand, long duration) { + return cpuFreqDemand * duration; + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java new file mode 100644 index 00000000..7eae70e6 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload.trace.scaling; + +/** + * PerfectScaling scales the workload duration perfectly + * based on the CPU capacity. + * + * This means that if a fragment has a duration of 10 min at 4000 mHz, + * it will take 20 min and 2000 mHz. + */ +public class PerfectScaling implements ScalingPolicy { + @Override + public double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime) { + return cpuFreqSupplied * passedTime; + } + + @Override + public long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork) { + return (long) (remainingWork / cpuFreqSupplied); + } + + @Override + public double getRemainingWork(double cpuFreqDemand, long duration) { + return cpuFreqDemand * duration; + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java new file mode 100644 index 00000000..a0f473ba --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload.trace.scaling; + +/** + * Interface for the scaling policy. + * A scaling decides how a TaskFragment should scale when it is not getting the demanded capacity + */ +public interface ScalingPolicy { + + /** + * Calculate how much work was finished based on the demanded and supplied cpu + * + * @param cpuFreqDemand + * @param cpuFreqSupplied + * @param passedTime + * @return + */ + double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime); + + /** + * Calculate the remaining duration of this fragment based on the demanded and supplied cpu + * + * @param cpuFreqDemand + * @param cpuFreqSupplied + * @param remainingWork + * @return + */ + long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork); + + /** + * Calculate how much work is remaining based on the demanded and supplied cpu + * + * @param cpuFreqDemand + * @param duration + * @return + */ + double getRemainingWork(double cpuFreqDemand, long duration); +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt index ad69a3d6..49baaf48 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt @@ -24,7 +24,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.suspendCancellableCoroutine import org.opendc.simulator.compute.machine.SimMachine -import org.opendc.simulator.compute.workload.TraceWorkload +import org.opendc.simulator.compute.workload.trace.TraceWorkload import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index 67540f4e..3f18ac76 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -139,16 +139,16 @@ public final class FlowEngine implements Runnable { * This method should only be invoked while inside an engine cycle. */ public void scheduleDelayedInContext(FlowNode ctx) { - FlowEventQueue timerQueue = this.eventQueue; - timerQueue.enqueue(ctx); + FlowEventQueue eventQueue = this.eventQueue; + eventQueue.enqueue(ctx); } /** * Run all the enqueued actions for the specified timestamp (now). */ private void doRunEngine(long now) { - final FlowCycleQueue queue = this.cycleQueue; - final FlowEventQueue timerQueue = this.eventQueue; + final FlowCycleQueue cycleQueue = this.cycleQueue; + final FlowEventQueue eventQueue = this.eventQueue; try { // Mark the engine as active to prevent concurrent calls to this method @@ -156,7 +156,7 @@ public final class FlowEngine implements Runnable { // Execute all scheduled updates at current timestamp while (true) { - final FlowNode ctx = timerQueue.poll(now); + final FlowNode ctx = eventQueue.poll(now); if (ctx == null) { break; } @@ -166,7 +166,7 @@ public final class FlowEngine implements Runnable { // Execute all immediate updates while (true) { - final FlowNode ctx = queue.poll(); + final FlowNode ctx = cycleQueue.poll(); if (ctx == null) { break; } @@ -178,7 +178,7 @@ public final class FlowEngine implements Runnable { } // Schedule an engine invocation for the next update to occur. - long headDeadline = timerQueue.peekDeadline(); + long headDeadline = eventQueue.peekDeadline(); if (headDeadline != Long.MAX_VALUE && headDeadline >= now) { trySchedule(futureInvocations, now, headDeadline); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index 16bb161f..ff7ff199 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -24,6 +24,9 @@ package org.opendc.simulator.engine.graph; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { private final ArrayList consumerEdges = new ArrayList<>(); @@ -36,13 +39,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu private double currentIncomingSupply; // The current supply provided by the supplier private boolean outgoingDemandUpdateNeeded = false; + private final Set updatedDemands = + new HashSet<>(); // Array of consumers that updated their demand in this cycle private boolean overloaded = false; private double capacity; // What is the max capacity. Can probably be removed - private final ArrayList updatedDemands = new ArrayList<>(); - public FlowDistributor(FlowGraph graph) { super(graph); } @@ -68,7 +71,9 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu return Long.MAX_VALUE; } - this.updateOutgoingSupplies(); + if (!this.outgoingSupplies.isEmpty()) { + this.updateOutgoingSupplies(); + } return Long.MAX_VALUE; } @@ -100,7 +105,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu // provide all consumers with their demand if (this.overloaded) { for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - if (this.outgoingSupplies.get(idx) != this.incomingDemands.get(idx)) { + if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) { this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx)); } } @@ -190,23 +195,25 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu this.totalIncomingDemand -= consumerEdge.getDemand(); + // Remove idx from consumers that updated their demands + this.updatedDemands.remove(idx); + this.consumerEdges.remove(idx); this.incomingDemands.remove(idx); this.outgoingSupplies.remove(idx); // update the consumer index for all consumerEdges higher than this. for (int i = idx; i < this.consumerEdges.size(); i++) { - this.consumerEdges.get(i).setConsumerIndex(i); + FlowEdge other = this.consumerEdges.get(i); + + other.setConsumerIndex(other.getConsumerIndex() - 1); } - for (int i = 0; i < this.updatedDemands.size(); i++) { - int j = this.updatedDemands.get(i); + for (int idx_other : this.updatedDemands) { - if (j == idx) { - this.updatedDemands.remove(idx); - } - if (j > idx) { - this.updatedDemands.set(i, j - 1); + if (idx_other > idx) { + this.updatedDemands.remove(idx_other); + this.updatedDemands.add(idx_other - 1); } } @@ -220,7 +227,9 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu this.capacity = 0; this.currentIncomingSupply = 0; - this.invalidate(); + this.updatedDemands.clear(); + + this.closeNode(); } @Override diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java index 0e6e137c..91662950 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java @@ -63,7 +63,7 @@ public class FlowGraph { // Remove all edges connected to node final ArrayList connectedEdges = nodeToEdge.get(node); - while (connectedEdges.size() > 0) { + while (!connectedEdges.isEmpty()) { removeEdge(connectedEdges.get(0)); } @@ -90,7 +90,7 @@ public class FlowGraph { throw new IllegalArgumentException("The consumer is not a node in this graph"); } if (!(this.nodes.contains((FlowNode) flowSupplier))) { - throw new IllegalArgumentException("The consumer is not a node in this graph"); + throw new IllegalArgumentException("The supplier is not a node in this graph"); } final FlowEdge flowEdge = new FlowEdge(flowConsumer, flowSupplier); -- cgit v1.2.3