diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2024-12-06 15:44:09 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-12-06 15:44:09 +0100 |
| commit | 8bbc3de611f9a679b5fb542241d32f887b4fe921 (patch) | |
| tree | fd19092f7921359c0cc619693a29b15b8cda2db3 /opendc-simulator | |
| parent | 0ce9557b2960979e7e25be7aae05c389d51da17e (diff) | |
Renamed Multiplexer to FlowDistributor (#282)
* Restructured opendc-simulator-flow.
Renamed Multiplexer to FlowDistributor.
* spotless applied
* Added FlowDistributor topologies back
Diffstat (limited to 'opendc-simulator')
26 files changed, 151 insertions, 268 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt deleted file mode 100644 index 8d8f4ef8..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import org.opendc.simulator.compute.old.SimBareMetalMachine -import org.opendc.simulator.compute.old.kernel.SimHypervisor -import org.opendc.simulator.compute.old.model.CpuModel -import org.opendc.simulator.compute.old.model.MachineModel -import org.opendc.simulator.compute.old.model.MemoryUnit -import org.opendc.simulator.compute.old.model.ProcessingNode -import org.opendc.simulator.compute.old.workload.SimTrace -import org.opendc.simulator.flow2.FlowEngine -import org.opendc.simulator.flow2.mux.FlowMultiplexerFactory -import org.opendc.simulator.kotlin.runSimulation -import org.openjdk.jmh.annotations.Benchmark -import org.openjdk.jmh.annotations.Fork -import org.openjdk.jmh.annotations.Measurement -import org.openjdk.jmh.annotations.Scope -import org.openjdk.jmh.annotations.Setup -import org.openjdk.jmh.annotations.State -import org.openjdk.jmh.annotations.Warmup -import java.util.SplittableRandom -import java.util.concurrent.ThreadLocalRandom -import java.util.concurrent.TimeUnit - -@State(Scope.Thread) -@Fork(1) -@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) -class SimMachineBenchmarks { - private lateinit var machineModel: MachineModel - private lateinit var trace: SimTrace - - @Setup - fun setUp() { - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) - - machineModel = - MachineModel( - // cpus - List(cpuNode.coreCount) { - CpuModel( - cpuNode, - it, - 1000.0, - ) - }, - // memory - List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }, - ) - - val random = ThreadLocalRandom.current() - val builder = SimTrace.builder() - repeat(1000000) { - val timestamp = it.toLong() * 1000 - val deadline = timestamp + 1000 - builder.add(deadline, random.nextDouble(0.0, 4500.0), 1) - } - trace = builder.build() - } - - @Benchmark - fun benchmarkBareMetal() { - return runSimulation { - val engine = FlowEngine.create(dispatcher) - val graph = engine.newGraph() - val machine = SimBareMetalMachine.create(graph, machineModel) - return@runSimulation machine.runWorkload(trace.createWorkload(0)) - } - } - - @Benchmark - fun benchmarkSpaceSharedHypervisor() { - return runSimulation { - val engine = FlowEngine.create(dispatcher) - val graph = engine.newGraph() - val machine = SimBareMetalMachine.create(graph, machineModel) - val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1)) - - launch { machine.runWorkload(hypervisor) } - - val vm = hypervisor.newMachine(machineModel) - - try { - return@runSimulation vm.runWorkload(trace.createWorkload(0)) - } finally { - vm.cancel() - machine.cancel() - } - } - } - - @Benchmark - fun benchmarkFairShareHypervisorSingle() { - return runSimulation { - val engine = FlowEngine.create(dispatcher) - val graph = engine.newGraph() - val machine = SimBareMetalMachine.create(graph, machineModel) - val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) - - launch { machine.runWorkload(hypervisor) } - - val vm = hypervisor.newMachine(machineModel) - - try { - return@runSimulation vm.runWorkload(trace.createWorkload(0)) - } finally { - vm.cancel() - machine.cancel() - } - } - } - - @Benchmark - fun benchmarkFairShareHypervisorDouble() { - return runSimulation { - val engine = FlowEngine.create(dispatcher) - val graph = engine.newGraph() - val machine = SimBareMetalMachine.create(graph, machineModel) - val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) - - launch { machine.runWorkload(hypervisor) } - - coroutineScope { - repeat(2) { - val vm = hypervisor.newMachine(machineModel) - - launch { - try { - vm.runWorkload(trace.createWorkload(0)) - } finally { - machine.cancel() - } - } - } - } - machine.cancel() - } - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java index 63331a6c..c5b8a9ea 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java @@ -24,11 +24,11 @@ package org.opendc.simulator.compute.cpu; import org.opendc.simulator.compute.machine.PerformanceCounters; import org.opendc.simulator.compute.models.CpuModel; -import org.opendc.simulator.engine.FlowConsumer; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowConsumer; +import org.opendc.simulator.engine.graph.FlowEdge; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; +import org.opendc.simulator.engine.graph.FlowSupplier; /** * A {@link SimCpu} of a machine. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java index 8364324a..074f0ed8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java @@ -24,7 +24,6 @@ package org.opendc.simulator.compute.machine; import java.time.InstantSource; import java.util.function.Consumer; -import org.opendc.simulator.Multiplexer; import org.opendc.simulator.compute.cpu.CpuPowerModel; import org.opendc.simulator.compute.cpu.SimCpu; import org.opendc.simulator.compute.memory.Memory; @@ -32,7 +31,8 @@ import org.opendc.simulator.compute.models.MachineModel; import org.opendc.simulator.compute.power.SimPsu; import org.opendc.simulator.compute.workload.SimWorkload; import org.opendc.simulator.compute.workload.Workload; -import org.opendc.simulator.engine.FlowGraph; +import org.opendc.simulator.engine.graph.FlowDistributor; +import org.opendc.simulator.engine.graph.FlowGraph; /** * A machine that is able to execute {@link SimWorkload} objects. @@ -44,7 +44,7 @@ public class SimMachine { private final InstantSource clock; private SimCpu cpu; - private Multiplexer cpuMux; + private FlowDistributor cpuMux; private SimPsu psu; private Memory memory; @@ -74,7 +74,7 @@ public class SimMachine { return cpu; } - public Multiplexer getCpuMux() { + public FlowDistributor getCpuMux() { return cpuMux; } @@ -114,7 +114,7 @@ public class SimMachine { public SimMachine( FlowGraph graph, MachineModel machineModel, - Multiplexer powerMux, + FlowDistributor powerMux, CpuPowerModel cpuPowerModel, Consumer<Exception> completion) { this.graph = graph; @@ -132,8 +132,8 @@ public class SimMachine { this.memory = new Memory(graph, this.machineModel.getMemory()); - // Create a Multiplexer and add the cpu as supplier - this.cpuMux = new Multiplexer(this.graph); + // Create a FlowDistributor and add the cpu as supplier + this.cpuMux = new FlowDistributor(this.graph); graph.addEdge(this.cpuMux, this.cpu); this.completion = completion; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java index 15a1b1c4..b8a9c738 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java @@ -26,11 +26,11 @@ import java.util.function.Consumer; import org.opendc.simulator.compute.cpu.SimCpu; import org.opendc.simulator.compute.workload.SimWorkload; import org.opendc.simulator.compute.workload.Workload; -import org.opendc.simulator.engine.FlowConsumer; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowConsumer; +import org.opendc.simulator.engine.graph.FlowEdge; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; +import org.opendc.simulator.engine.graph.FlowSupplier; /* A virtual Machine created to run a single workload diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java index 2656a99a..d4406b20 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/memory/Memory.java @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.memory; import org.opendc.simulator.compute.models.MemoryUnit; -import org.opendc.simulator.engine.FlowGraph; +import org.opendc.simulator.engine.graph.FlowGraph; /** * The [SimMemory] implementation for a machine. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java index 98ef2b72..91095c01 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/CarbonModel.java @@ -23,8 +23,8 @@ package org.opendc.simulator.compute.power; import java.util.List; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; /** * CarbonModel used to provide the Carbon Intensity of a {@link SimPowerSource} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java index ea500c81..e8626e40 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java @@ -24,10 +24,10 @@ package org.opendc.simulator.compute.power; import java.util.List; import org.opendc.simulator.compute.cpu.SimCpu; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowEdge; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; +import org.opendc.simulator.engine.graph.FlowSupplier; /** * A {@link SimPsu} implementation that estimates the power consumption based on CPU usage. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java index 709d3e15..c1e8a1b9 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java @@ -23,11 +23,11 @@ package org.opendc.simulator.compute.power; import org.opendc.simulator.compute.cpu.SimCpu; -import org.opendc.simulator.engine.FlowConsumer; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowConsumer; +import org.opendc.simulator.engine.graph.FlowEdge; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; +import org.opendc.simulator.engine.graph.FlowSupplier; /** * A {@link SimPsu} implementation that estimates the power consumption based on CPU usage. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java index 78e8b5d4..ecd4c47f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload; import java.util.ArrayList; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowSupplier; public class ChainWorkload implements Workload { private ArrayList<Workload> workloads; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java index 723c450d..f4f7cdd6 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java @@ -29,8 +29,8 @@ package org.opendc.simulator.compute.workload; import java.time.InstantSource; import org.jetbrains.annotations.NotNull; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; public class CheckpointModel extends FlowNode { private SimWorkload simWorkload; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java index 5b7c10bb..75bdde92 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -23,9 +23,9 @@ package org.opendc.simulator.compute.workload; import java.util.LinkedList; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowEdge; +import org.opendc.simulator.engine.graph.FlowNode; +import org.opendc.simulator.engine.graph.FlowSupplier; /** * A {@link SimChainWorkload} that composes multiple {@link SimWorkload}s. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java index 59994fe6..72c095dc 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java @@ -23,11 +23,11 @@ package org.opendc.simulator.compute.workload; import java.util.LinkedList; -import org.opendc.simulator.engine.FlowConsumer; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowConsumer; +import org.opendc.simulator.engine.graph.FlowEdge; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; +import org.opendc.simulator.engine.graph.FlowSupplier; public class SimTraceWorkload extends SimWorkload implements FlowConsumer { private LinkedList<TraceFragment> remainingFragments; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java index b5c89941..2919fc3a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java @@ -22,9 +22,9 @@ package org.opendc.simulator.compute.workload; -import org.opendc.simulator.engine.FlowConsumer; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; +import org.opendc.simulator.engine.graph.FlowConsumer; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; /** * A model that characterizes the runtime behavior of some particular workload. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java index 39bb6111..7f82ab71 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java @@ -24,7 +24,7 @@ package org.opendc.simulator.compute.workload; import java.util.ArrayList; import java.util.List; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowSupplier; public class TraceWorkload implements Workload { private ArrayList<TraceFragment> fragments; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java index cd34921a..d85669bb 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java @@ -22,7 +22,7 @@ package org.opendc.simulator.compute.workload; -import org.opendc.simulator.engine.FlowSupplier; +import org.opendc.simulator.engine.graph.FlowSupplier; public interface Workload { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index 10af7c51..1a068b40 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -20,12 +20,14 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.engine; import java.time.Clock; import java.time.InstantSource; import kotlin.coroutines.CoroutineContext; import org.opendc.common.Dispatcher; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; /** * A {@link FlowEngine} simulates a generic flow network. @@ -89,7 +91,7 @@ public final class FlowEngine implements Runnable { * This method should be used when the state of a flow context is invalidated/interrupted and needs to be * re-computed. */ - void scheduleImmediate(long now, FlowNode ctx) { + public void scheduleImmediate(long now, FlowNode ctx) { scheduleImmediateInContext(ctx); // In-case the engine is already running in the call-stack, return immediately. The changes will be picked @@ -109,14 +111,14 @@ public final class FlowEngine implements Runnable { * <p> * This method should only be invoked while inside an engine cycle. */ - void scheduleImmediateInContext(FlowNode ctx) { + public void scheduleImmediateInContext(FlowNode ctx) { queue.add(ctx); } /** * Enqueue the specified {@link FlowNode} to be updated at its updated deadline. */ - void scheduleDelayed(FlowNode ctx) { + public void scheduleDelayed(FlowNode ctx) { scheduleDelayedInContext(ctx); // In-case the engine is already running in the call-stack, return immediately. The changes will be picked @@ -136,7 +138,7 @@ public final class FlowEngine implements Runnable { * <p> * This method should only be invoked while inside an engine cycle. */ - void scheduleDelayedInContext(FlowNode ctx) { + public void scheduleDelayedInContext(FlowNode ctx) { FlowTimerQueue timerQueue = this.timerQueue; timerQueue.enqueue(ctx); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java index 37b3c65b..bd622083 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java @@ -20,10 +20,11 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.engine; import java.util.ArrayDeque; import java.util.Arrays; +import org.opendc.simulator.engine.graph.FlowNode; /** * A specialized {@link ArrayDeque} implementation that contains the {@link FlowNode}s diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java index 1e348b10..049eb40d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java @@ -20,9 +20,10 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.engine; import java.util.Arrays; +import org.opendc.simulator.engine.graph.FlowNode; /** * A specialized priority queue for timers of {@link FlowNode}s. @@ -55,9 +56,9 @@ public final class FlowTimerQueue { */ public void enqueue(FlowNode node) { FlowNode[] es = queue; - int k = node.timerIndex; + int k = node.getTimerIndex(); - if (node.deadline != Long.MAX_VALUE) { + if (node.getDeadline() != Long.MAX_VALUE) { if (k >= 0) { update(es, node, k); } else { @@ -82,7 +83,7 @@ public final class FlowTimerQueue { final FlowNode[] es = queue; final FlowNode head = es[0]; - if (now < head.deadline) { + if (now < head.getDeadline()) { return null; } @@ -95,7 +96,7 @@ public final class FlowTimerQueue { siftDown(0, next, es, n); } - head.timerIndex = -1; + head.setTimerIndex(-1); return head; } @@ -104,7 +105,7 @@ public final class FlowTimerQueue { */ public long peekDeadline() { if (this.size > 0) { - return this.queue[0].deadline; + return this.queue[0].getDeadline(); } return Long.MAX_VALUE; @@ -130,7 +131,7 @@ public final class FlowTimerQueue { private void update(FlowNode[] es, FlowNode node, int k) { if (k > 0) { int parent = (k - 1) >>> 1; - if (es[parent].deadline > node.deadline) { + if (es[parent].getDeadline() > node.getDeadline()) { siftUp(k, node, es); return; } @@ -175,13 +176,13 @@ public final class FlowTimerQueue { while (k > 0) { int parent = (k - 1) >>> 1; FlowNode e = es[parent]; - if (key.deadline >= e.deadline) break; + if (key.getDeadline() >= e.getDeadline()) break; es[k] = e; - e.timerIndex = k; + e.setTimerIndex(k); k = parent; } es[k] = key; - key.timerIndex = k; + key.setTimerIndex(k); } private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) { @@ -190,16 +191,16 @@ public final class FlowTimerQueue { int child = (k << 1) + 1; // assume left child is least FlowNode c = es[child]; int right = child + 1; - if (right < n && c.deadline > es[right].deadline) c = es[child = right]; + if (right < n && c.getDeadline() > es[right].getDeadline()) c = es[child = right]; - if (key.deadline <= c.deadline) break; + if (key.getDeadline() <= c.getDeadline()) break; es[k] = c; - c.timerIndex = k; + c.setTimerIndex(k); k = child; } es[k] = key; - key.timerIndex = k; + key.setTimerIndex(k); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java index 15da2f23..5607278c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.engine; import java.util.Arrays; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java index ddb40794..2130d376 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.graph; public interface FlowConsumer { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index 48177412..7ef091f8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -20,17 +20,12 @@ * SOFTWARE. */ -package org.opendc.simulator; +package org.opendc.simulator.engine.graph; import java.util.ArrayList; import java.util.Arrays; -import org.opendc.simulator.engine.FlowConsumer; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; -public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer { +public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); private FlowEdge supplierEdge; @@ -45,7 +40,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer private double capacity; // What is the max capacity - public Multiplexer(FlowGraph graph) { + public FlowDistributor(FlowGraph graph) { super(graph); } @@ -70,7 +65,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer // if supply >= demand -> push supplies to all tasks if (this.totalSupply > this.totalDemand) { - // If this came from a state of over provisioning, provide all consumers with their demand + // If this came from a state of overload, provide all consumers with their demand if (this.overLoaded) { for (int idx = 0; idx < this.consumerEdges.size(); idx++) { this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx)); @@ -99,6 +94,12 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer private record Demand(int idx, double value) {} + /** + * Distributed the available supply over the different demands. + * The supply is distributed using MaxMin Fairness. + * + * TODO: Move this outside of the Distributor so we can easily add different redistribution methods + */ private static double[] redistributeSupply(ArrayList<Double> demands, double totalSupply) { int inputSize = demands.size(); @@ -198,7 +199,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer this.currentConsumerIdx = idx; if (idx == -1) { - System.out.println("Error (Multiplexer): Demand pushed by an unknown consumer"); + System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer"); return; } @@ -234,7 +235,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer int idx = consumerEdge.getConsumerIndex(); if (idx == -1) { - System.out.println("Error (Multiplexer): pushing supply to an unknown consumer"); + System.out.println("Error (FlowDistributor): pushing supply to an unknown consumer"); } if (supplies.get(idx) == newSupply) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java index 95fe7928..b7162508 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.graph; /** * An edge that connects two FlowStages. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java index d82b542b..0e6e137c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java @@ -20,10 +20,11 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.graph; import java.util.ArrayList; import java.util.HashMap; +import org.opendc.simulator.engine.engine.FlowEngine; public class FlowGraph { private final FlowEngine engine; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java index d1faf465..6ee947bc 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java @@ -20,9 +20,11 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.graph; import java.time.InstantSource; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.engine.FlowTimerQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,21 +44,79 @@ public abstract class FlowNode { protected NodeState nodeState = NodeState.PENDING; + public NodeState getNodeState() { + return nodeState; + } + + public void setNodeState(NodeState nodeState) { + this.nodeState = nodeState; + } + + public int getTimerIndex() { + return timerIndex; + } + + public void setTimerIndex(int index) { + this.timerIndex = index; + } + + public InstantSource getClock() { + return clock; + } + + public void setClock(InstantSource clock) { + this.clock = clock; + } + + public FlowGraph getParentGraph() { + return parentGraph; + } + + public void setParentGraph(FlowGraph parentGraph) { + this.parentGraph = parentGraph; + } + + public FlowEngine getEngine() { + return engine; + } + + public void setEngine(FlowEngine engine) { + this.engine = engine; + } + + /** + * Return the current deadline of the {@link FlowNode}'s timer (in milliseconds after epoch). + */ + public long getDeadline() { + return deadline; + } + + public void setDeadline(long deadline) { + this.deadline = deadline; + } + /** * The deadline of the stage after which an update should run. */ - long deadline = Long.MAX_VALUE; + private long deadline = Long.MAX_VALUE; /** * The index of the timer in the {@link FlowTimerQueue}. */ - int timerIndex = -1; + private int timerIndex = -1; protected InstantSource clock; protected FlowGraph parentGraph; protected FlowEngine engine; /** + * Return the {@link FlowGraph} to which this stage belongs. + */ + public FlowGraph getGraph() { + return parentGraph; + } + + /** * Construct a new {@link FlowNode} instance. * * @param parentGraph The {@link FlowGraph} this stage belongs to. @@ -70,27 +130,6 @@ public abstract class FlowNode { } /** - * Return the {@link FlowGraph} to which this stage belongs. - */ - public FlowGraph getGraph() { - return parentGraph; - } - - /** - * Return the current deadline of the {@link FlowNode}'s timer (in milliseconds after epoch). - */ - public long getDeadline() { - return deadline; - } - - public void setDeadline(long deadline) { - this.deadline = deadline; - } - - public void setTimerIndex(int index) { - this.timerIndex = index; - } - /** * Invalidate the {@link FlowNode} forcing the stage to update. * * <p> diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java index 955f4943..84602ee0 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.graph; public interface FlowSupplier { diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt index 7744d7b2..4dd17dbe 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt @@ -24,7 +24,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test -import org.opendc.simulator.engine.InvocationStack +import org.opendc.simulator.engine.engine.InvocationStack /** * Test suite for the [InvocationStack] class. |
