summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt125
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java50
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java22
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java6
5 files changed, 36 insertions, 171 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
deleted file mode 100644
index 0ab051a4..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2
-
-import kotlinx.coroutines.launch
-import org.opendc.simulator.flow2.mux.MaxMinFlowMultiplexer
-import org.opendc.simulator.flow2.sink.SimpleFlowSink
-import org.opendc.simulator.flow2.source.TraceFlowSource
-import org.opendc.simulator.flow2.util.FlowTransformer
-import org.opendc.simulator.flow2.util.FlowTransforms
-import org.opendc.simulator.kotlin.runSimulation
-import org.openjdk.jmh.annotations.Benchmark
-import org.openjdk.jmh.annotations.Fork
-import org.openjdk.jmh.annotations.Measurement
-import org.openjdk.jmh.annotations.Scope
-import org.openjdk.jmh.annotations.Setup
-import org.openjdk.jmh.annotations.State
-import org.openjdk.jmh.annotations.Warmup
-import java.util.concurrent.ThreadLocalRandom
-import java.util.concurrent.TimeUnit
-
-@State(Scope.Thread)
-@Fork(1)
-@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS)
-@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
-class FlowBenchmarks {
- private lateinit var trace: TraceFlowSource.Trace
-
- @Setup
- fun setUp() {
- val random = ThreadLocalRandom.current()
- val traceSize = 10_000_000
- trace =
- TraceFlowSource.Trace(
- LongArray(traceSize) { (it + 1) * 1000L },
- FloatArray(traceSize) { random.nextFloat(0.0f, 4500.0f) },
- traceSize,
- )
- }
-
- @Benchmark
- fun benchmarkSink() {
- return runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
- val sink = SimpleFlowSink(graph, 4200.0f)
- val source = TraceFlowSource(graph, trace)
- graph.connect(source.output, sink.input)
- }
- }
-
- @Benchmark
- fun benchmarkForward() {
- return runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
- val sink = SimpleFlowSink(graph, 4200.0f)
- val source = TraceFlowSource(graph, trace)
- val forwarder = FlowTransformer(graph, FlowTransforms.noop())
-
- graph.connect(source.output, forwarder.input)
- graph.connect(forwarder.output, sink.input)
- }
- }
-
- @Benchmark
- fun benchmarkMuxMaxMinSingleSource() {
- return runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
- val switch = MaxMinFlowMultiplexer(graph)
-
- val sinkA = SimpleFlowSink(graph, 3000.0f)
- val sinkB = SimpleFlowSink(graph, 3000.0f)
-
- graph.connect(switch.newOutPort(), sinkA.input)
- graph.connect(switch.newOutPort(), sinkB.input)
-
- val source = TraceFlowSource(graph, trace)
- graph.connect(source.output, switch.newInput())
- }
- }
-
- @Benchmark
- fun benchmarkMuxMaxMinTripleSource() {
- return runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
- val switch = MaxMinFlowMultiplexer(graph)
-
- val sinkA = SimpleFlowSink(graph, 3000.0f)
- val sinkB = SimpleFlowSink(graph, 3000.0f)
-
- graph.connect(switch.newOutPort(), sinkA.input)
- graph.connect(switch.newOutPort(), sinkB.input)
-
- repeat(3) {
- launch {
- val source = TraceFlowSource(graph, trace)
- graph.connect(source.output, switch.newInput())
- }
- }
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java
index 0af2499a..a87ded8d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java
@@ -34,26 +34,26 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
private ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
private FlowEdge supplierEdge;
- private ArrayList<Float> demands = new ArrayList<>(); // What is demanded by the consumers
- private ArrayList<Float> supplies = new ArrayList<>(); // What is supplied to the consumers
+ private ArrayList<Double> demands = new ArrayList<>(); // What is demanded by the consumers
+ private ArrayList<Double> supplies = new ArrayList<>(); // What is supplied to the consumers
- private float totalDemand; // The total demand of all the consumers
- private float totalSupply; // The total supply from the supplier
- private float capacity; // What is the max capacity
+ private double totalDemand; // The total demand of all the consumers
+ private double totalSupply; // The total supply from the supplier
+ private double capacity; // What is the max capacity
public Multiplexer(FlowGraph graph) {
super(graph);
}
- public float getTotalDemand() {
+ public double getTotalDemand() {
return totalDemand;
}
- public float getTotalSupply() {
+ public double getTotalSupply() {
return totalSupply;
}
- public float getCapacity() {
+ public double getCapacity() {
return capacity;
}
@@ -67,7 +67,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
}
}
- float totalSupply = 0;
+ double totalSupply = 0;
for (int i = 0; i < this.consumerEdges.size(); i++) {
this.pushSupply(this.consumerEdges.get(i), this.supplies.get(i));
totalSupply += this.supplies.get(i);
@@ -83,8 +83,8 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
return Long.MAX_VALUE;
}
- private static float redistributeSupply(
- ArrayList<FlowEdge> consumerEdges, ArrayList<Float> supplies, float capacity) {
+ private static double redistributeSupply(
+ ArrayList<FlowEdge> consumerEdges, ArrayList<Double> supplies, double capacity) {
final long[] consumers = new long[consumerEdges.size()];
for (int i = 0; i < consumers.length; i++) {
@@ -94,24 +94,24 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
break;
}
- consumers[i] = ((long) Float.floatToRawIntBits(consumer.getDemand()) << 32) | (i & 0xFFFFFFFFL);
+ consumers[i] = (Double.doubleToRawLongBits(consumer.getDemand()) << 32) | (i & 0xFFFFFFFFL);
}
Arrays.sort(consumers);
- float availableCapacity = capacity;
+ double availableCapacity = capacity;
int inputSize = consumers.length;
for (int i = 0; i < inputSize; i++) {
long v = consumers[i];
int slot = (int) v;
- float d = Float.intBitsToFloat((int) (v >> 32));
+ double d = Double.longBitsToDouble((int) (v >> 32));
if (d == 0.0) {
continue;
}
- float availableShare = availableCapacity / (inputSize - i);
- float r = Math.min(d, availableShare);
+ double availableShare = availableCapacity / (inputSize - i);
+ double r = Math.min(d, availableShare);
supplies.set(slot, r); // Update the rates
availableCapacity -= r;
@@ -128,8 +128,8 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
@Override
public void addConsumerEdge(FlowEdge consumerEdge) {
this.consumerEdges.add(consumerEdge);
- this.demands.add(0f);
- this.supplies.add(0f);
+ this.demands.add(0.0);
+ this.supplies.add(0.0);
}
@Override
@@ -164,7 +164,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
}
@Override
- public void handleDemand(FlowEdge consumerEdge, float newDemand) {
+ public void handleDemand(FlowEdge consumerEdge, double newDemand) {
int idx = consumerEdges.indexOf(consumerEdge);
if (idx == -1) {
@@ -172,14 +172,14 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
return;
}
- float prevDemand = demands.get(idx);
+ double prevDemand = demands.get(idx);
demands.set(idx, newDemand);
this.totalDemand += (newDemand - prevDemand);
}
@Override
- public void handleSupply(FlowEdge supplierEdge, float newSupply) {
+ public void handleSupply(FlowEdge supplierEdge, double newSupply) {
if (newSupply == this.totalSupply) {
return;
}
@@ -188,22 +188,18 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
}
@Override
- public void pushDemand(FlowEdge supplierEdge, float newDemand) {
+ public void pushDemand(FlowEdge supplierEdge, double newDemand) {
this.supplierEdge.pushDemand(newDemand);
}
@Override
- public void pushSupply(FlowEdge consumerEdge, float newSupply) {
+ public void pushSupply(FlowEdge consumerEdge, double newSupply) {
int idx = consumerEdges.indexOf(consumerEdge);
if (idx == -1) {
System.out.println("Error (Multiplexer): pushing supply to an unknown consumer");
}
- if (newSupply == supplies.get(idx)) {
- return;
- }
-
supplies.set(idx, newSupply);
consumerEdge.pushSupply(newSupply);
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java
index 7ba5dea7..ddb40794 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java
@@ -24,9 +24,9 @@ package org.opendc.simulator.engine;
public interface FlowConsumer {
- void handleSupply(FlowEdge supplierEdge, float newSupply);
+ void handleSupply(FlowEdge supplierEdge, double newSupply);
- void pushDemand(FlowEdge supplierEdge, float newDemand);
+ void pushDemand(FlowEdge supplierEdge, double newDemand);
void addSupplierEdge(FlowEdge supplierEdge);
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java
index 0edc9e68..d89740a2 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java
@@ -32,10 +32,10 @@ public class FlowEdge {
private FlowConsumer consumer;
private FlowSupplier supplier;
- private float demand = 0.0f;
- private float supply = 0.0f;
+ private double demand = 0.0;
+ private double supply = 0.0;
- private float capacity;
+ private double capacity;
public FlowEdge(FlowConsumer consumer, FlowSupplier supplier) {
if (!(consumer instanceof FlowNode)) {
@@ -74,25 +74,22 @@ public class FlowEdge {
return supplier;
}
- public float getCapacity() {
+ public double getCapacity() {
return capacity;
}
- public float getDemand() {
+ public double getDemand() {
return this.demand;
}
- public float getSupply() {
+ public double getSupply() {
return this.supply;
}
/**
* Push new demand from the Consumer to the Supplier
*/
- public void pushDemand(float newDemand) {
- if (newDemand == this.demand) {
- return;
- }
+ public void pushDemand(double newDemand) {
this.demand = newDemand;
this.supplier.handleDemand(this, newDemand);
@@ -102,10 +99,7 @@ public class FlowEdge {
/**
* Push new supply from the Supplier to the Consumer
*/
- public void pushSupply(float newSupply) {
- if (newSupply == this.supply) {
- return;
- }
+ public void pushSupply(double newSupply) {
this.supply = newSupply;
this.consumer.handleSupply(this, newSupply);
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java
index 87729fca..955f4943 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java
@@ -24,13 +24,13 @@ package org.opendc.simulator.engine;
public interface FlowSupplier {
- void handleDemand(FlowEdge consumerEdge, float newDemand);
+ void handleDemand(FlowEdge consumerEdge, double newDemand);
- void pushSupply(FlowEdge consumerEdge, float newSupply);
+ void pushSupply(FlowEdge consumerEdge, double newSupply);
void addConsumerEdge(FlowEdge consumerEdge);
void removeConsumerEdge(FlowEdge consumerEdge);
- float getCapacity();
+ double getCapacity();
}