summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-11-26 11:09:21 +0100
committerGitHub <noreply@github.com>2024-11-26 11:09:21 +0100
commitec73210b675fd90568c5193e6ae6ef82ce81be6c (patch)
tree89b530b53898752f7800e5109548412b84fcd375
parent698a64615d0eef8994fc1eaf0a3b71da194e1dcd (diff)
Streamlined the FlowNetwork for better performance (#273)
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt3
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt1
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt11
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt26
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt7
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt8
-rw-r--r--opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SARunner.kt19
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java26
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java9
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java15
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java46
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java24
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java24
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java8
15 files changed, 101 insertions, 128 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
index 32fcf277..0b9916ed 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
@@ -32,7 +32,6 @@ import org.opendc.compute.simulator.telemetry.GuestSystemStats
import org.opendc.compute.simulator.telemetry.HostCpuStats
import org.opendc.compute.simulator.telemetry.HostSystemStats
import org.opendc.simulator.Multiplexer
-import org.opendc.simulator.compute.cpu.CpuPowerModel
import org.opendc.simulator.compute.machine.SimMachine
import org.opendc.simulator.compute.models.MachineModel
import org.opendc.simulator.compute.models.MemoryUnit
@@ -61,7 +60,6 @@ public class SimHost(
private val clock: InstantSource,
private val graph: FlowGraph,
private val machineModel: MachineModel,
- private val powerModel: CpuPowerModel,
private val powerMux: Multiplexer,
) : AutoCloseable {
/**
@@ -131,7 +129,6 @@ public class SimHost(
SimMachine(
this.graph,
this.machineModel,
- this.powerModel,
this.powerMux,
) { cause ->
hostState = if (cause != null) HostState.ERROR else HostState.DOWN
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
index 8e7293c8..30b50c4b 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
@@ -77,7 +77,6 @@ public class HostsProvisioningStep internal constructor(
ctx.dispatcher.timeSource,
graph,
hostSpec.model,
- hostSpec.cpuPowerModel,
powerMux,
)
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
index e067bf45..f4df7991 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
@@ -25,11 +25,9 @@
package org.opendc.experiments.base.runner
import com.github.ajalt.clikt.core.CliktCommand
-import com.github.ajalt.clikt.parameters.options.default
import com.github.ajalt.clikt.parameters.options.defaultLazy
import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.types.file
-import com.github.ajalt.clikt.parameters.types.int
import org.opendc.experiments.base.experiment.getExperiment
import java.io.File
@@ -49,15 +47,8 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") {
.file(canBeDir = false, canBeFile = true)
.defaultLazy { File("resources/experiment.json") }
- /**
- * The number of threads to use for parallelism.
- */
- private val parallelism by option("-p", "--parallelism", help = "number of worker threads")
- .int()
- .default(Runtime.getRuntime().availableProcessors() - 1)
-
override fun run() {
val experiment = getExperiment(scenarioPath)
- runExperiment(experiment, parallelism)
+ runExperiment(experiment)
}
}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
index 0b45806b..079db6fc 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
@@ -22,35 +22,39 @@
package org.opendc.experiments.base.runner
+import me.tongfei.progressbar.ProgressBarBuilder
+import me.tongfei.progressbar.ProgressBarStyle
import org.opendc.experiments.base.experiment.Scenario
-import java.util.concurrent.ForkJoinPool
/**
* Run scenario when no pool is available for parallel execution
*
* @param experiment The scenarios to run
- * @param parallelism The number of scenarios that can be run in parallel
*/
-public fun runExperiment(
- experiment: List<Scenario>,
- parallelism: Int,
-) {
+public fun runExperiment(experiment: List<Scenario>) {
val ansiReset = "\u001B[0m"
val ansiGreen = "\u001B[32m"
val ansiBlue = "\u001B[34m"
setupOutputFolderStructure(experiment[0].outputFolder)
+ val pb =
+ ProgressBarBuilder().setInitialMax(experiment.sumOf { scenario -> scenario.runs.toLong() })
+ .setStyle(ProgressBarStyle.ASCII)
+ .setTaskName("Simulating...").build()
+
for (scenario in experiment) {
- val pool = ForkJoinPool(parallelism)
println(
"\n\n$ansiGreen================================================================================$ansiReset",
)
println("$ansiBlue Running scenario: ${scenario.name} $ansiReset")
println("$ansiGreen================================================================================$ansiReset")
- runScenario(
- scenario,
- pool,
- )
+
+ for (seed in 0..<scenario.runs) {
+ println("$ansiBlue Starting seed: $seed $ansiReset")
+ runScenario(scenario, seed.toLong())
+ pb.step()
+ }
}
+ pb.close()
}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
index 4d6069e4..30c129c2 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
@@ -47,16 +47,13 @@ import java.util.stream.LongStream
* The scenario is run multiple times based on the user input
*
* @param scenario The scenario to run
- * @param pool The pool on which to run the scenarios
*/
-public fun runScenario(
- scenario: Scenario,
- pool: ForkJoinPool,
-) {
+public fun runScenario(scenario: Scenario) {
val pb =
ProgressBarBuilder().setInitialMax(scenario.runs.toLong()).setStyle(ProgressBarStyle.ASCII)
.setTaskName("Simulating...").build()
+ val pool = ForkJoinPool(5)
pool.submit {
LongStream.range(0, scenario.runs.toLong()).parallel().forEach {
runScenario(scenario, scenario.initialSeed + it)
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt
index abf16fef..17d62b1a 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt
@@ -294,8 +294,8 @@ class ScenarioIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(1803918472, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(787181528, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(1803918473, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(787181527, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
{ assertEquals(6.7565629E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
@@ -341,8 +341,8 @@ class ScenarioIntegrationTest {
{ assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") },
{ assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") },
- { assertEquals(43101793092, monitor.idleTime) { "Incorrect idle time" } },
- { assertEquals(3489406908, monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(43101787433, monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(3489412567, monitor.activeTime) { "Incorrect active time" } },
{ assertEquals(0, monitor.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
{ assertEquals(1.0016123392181786E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SARunner.kt b/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SARunner.kt
index 0068738a..49bbdb96 100644
--- a/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SARunner.kt
+++ b/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SARunner.kt
@@ -25,9 +25,7 @@
package org.opendc.experiments.m3sa.runner
import org.opendc.experiments.base.experiment.Scenario
-import org.opendc.experiments.base.runner.runScenario
import org.opendc.experiments.base.runner.setupOutputFolderStructure
-import java.util.concurrent.ForkJoinPool
/**
* Run scenario when no pool is available for parallel execution
@@ -39,22 +37,7 @@ public fun runExperiment(
experiment: List<Scenario>,
parallelism: Int,
) {
- val ansiReset = "\u001B[0m"
- val ansiGreen = "\u001B[32m"
- val ansiBlue = "\u001B[34m"
-
setupOutputFolderStructure(experiment[0].outputFolder)
- for (scenario in experiment) {
- val pool = ForkJoinPool(parallelism)
- println(
- "\n\n$ansiGreen================================================================================$ansiReset",
- )
- println("$ansiBlue Running scenario: ${scenario.name} $ansiReset")
- println("$ansiGreen================================================================================$ansiReset")
- runScenario(
- scenario,
- pool,
- )
- }
+ runExperiment(experiment, parallelism)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java
index 24627a9c..18214172 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java
@@ -46,7 +46,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
private double maxCapacity;
- private PerformanceCounters performanceCounters = new PerformanceCounters();
+ private final PerformanceCounters performanceCounters = new PerformanceCounters();
private long lastCounterUpdate;
private final double cpuFrequencyInv;
@@ -122,22 +122,20 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
public long onUpdate(long now) {
updateCounters(now);
+ this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity;
+
// Calculate Power Demand and send to PSU
- // TODO: look at the double / double thing
double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization);
if (powerDemand != this.currentPowerDemand) {
this.pushDemand(this.psuEdge, powerDemand);
- this.currentPowerDemand = powerDemand;
}
// Calculate the amount of cpu this can provide
- // TODO: This should be based on the provided power
double cpuSupply = this.currentCpuDemand;
if (cpuSupply != this.currentCpuSupplied) {
this.pushSupply(this.muxEdge, cpuSupply);
- this.currentCpuSupplied = cpuSupply;
}
return Long.MAX_VALUE;
@@ -183,6 +181,8 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
*/
@Override
public void pushDemand(FlowEdge supplierEdge, double newPowerDemand) {
+ updateCounters();
+ this.currentPowerDemand = newPowerDemand;
this.psuEdge.pushDemand(newPowerDemand);
}
@@ -205,7 +205,12 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
this.currentCpuDemand = newCpuDemand;
this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity;
- this.invalidate();
+ // Calculate Power Demand and send to PSU
+ double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization);
+
+ if (powerDemand != this.currentPowerDemand) {
+ this.pushDemand(this.psuEdge, powerDemand);
+ }
}
/**
@@ -217,7 +222,12 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
updateCounters();
this.currentPowerSupplied = newPowerSupply;
- this.invalidate();
+ // Calculate the amount of cpu this can provide
+ double cpuSupply = this.currentCpuDemand;
+
+ if (cpuSupply != this.currentCpuSupplied) {
+ this.pushSupply(this.muxEdge, cpuSupply);
+ }
}
/**
@@ -234,6 +244,8 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer
@Override
public void addSupplierEdge(FlowEdge supplierEdge) {
this.psuEdge = supplierEdge;
+
+ this.invalidate();
}
/**
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java
index 4602223c..c7caa63f 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java
@@ -25,7 +25,6 @@ package org.opendc.simulator.compute.machine;
import java.time.InstantSource;
import java.util.function.Consumer;
import org.opendc.simulator.Multiplexer;
-import org.opendc.simulator.compute.cpu.CpuPowerModel;
import org.opendc.simulator.compute.cpu.SimCpu;
import org.opendc.simulator.compute.memory.Memory;
import org.opendc.simulator.compute.models.MachineModel;
@@ -48,7 +47,7 @@ public class SimMachine {
private SimPsu psu;
private Memory memory;
- private Consumer<Exception> completion;
+ private final Consumer<Exception> completion;
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Basic Getters and Setters
@@ -112,11 +111,7 @@ public class SimMachine {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
public SimMachine(
- FlowGraph graph,
- MachineModel machineModel,
- CpuPowerModel cpuPowerModel,
- Multiplexer powerMux,
- Consumer<Exception> completion) {
+ FlowGraph graph, MachineModel machineModel, Multiplexer powerMux, Consumer<Exception> completion) {
this.graph = graph;
this.machineModel = machineModel;
this.clock = graph.getEngine().getClock();
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java
index 0557165b..15a1b1c4 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java
@@ -36,7 +36,7 @@ import org.opendc.simulator.engine.FlowSupplier;
A virtual Machine created to run a single workload
*/
public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSupplier {
- private SimMachine machine;
+ private final SimMachine machine;
private SimWorkload activeWorkload;
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java
index a219d4e6..ea500c81 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java
@@ -126,12 +126,6 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier {
@Override
public long onUpdate(long now) {
- updateCounters();
- double powerSupply = this.powerDemand;
-
- if (powerSupply != this.powerSupplied) {
- this.pushSupply(this.muxEdge, powerSupply);
- }
return Long.MAX_VALUE;
}
@@ -163,14 +157,17 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier {
@Override
public void handleDemand(FlowEdge consumerEdge, double newPowerDemand) {
-
this.powerDemand = newPowerDemand;
- this.invalidate();
+
+ double powerSupply = this.powerDemand;
+
+ if (powerSupply != this.powerSupplied) {
+ this.pushSupply(this.muxEdge, powerSupply);
+ }
}
@Override
public void pushSupply(FlowEdge consumerEdge, double newSupply) {
-
this.powerSupplied = newSupply;
consumerEdge.pushSupply(newSupply);
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java
index f91c363d..5b7c10bb 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java
@@ -170,7 +170,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier {
/**
* Add connection to the active workload
*
- * @param consumerEdge
+ * @param consumerEdge The edge to the workload
*/
@Override
public void addConsumerEdge(FlowEdge consumerEdge) {
@@ -179,7 +179,8 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier {
/**
* Add Connection to the cpuMux
- * @param supplierEdge
+ *
+ * @param supplierEdge The edge to the cpuMux
*/
@Override
public void addSupplierEdge(FlowEdge supplierEdge) {
@@ -190,53 +191,48 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier {
/**
* Push demand to the cpuMux
*
- * @param supplierEdge
- * @param newDemand
+ * @param supplierEdge The edge to the cpuMux
+ * @param newDemand new demand to sent to the cpu
*/
@Override
public void pushDemand(FlowEdge supplierEdge, double newDemand) {
+
+ this.demand = newDemand;
this.machineEdge.pushDemand(newDemand);
}
/**
* Push supply to the workload
*
- * @param consumerEdge
- * @param newSupply
+ * @param consumerEdge The edge to the cpuMux
+ * @param newSupply new supply to sent to the workload
*/
@Override
public void pushSupply(FlowEdge consumerEdge, double newSupply) {
+
+ this.supply = newSupply;
this.workloadEdge.pushSupply(newSupply);
}
/**
* Handle new demand coming from the workload
*
- * @param consumerEdge
- * @param newDemand
+ * @param consumerEdge The edge to the workload
+ * @param newDemand new demand coming from the workload
*/
@Override
public void handleDemand(FlowEdge consumerEdge, double newDemand) {
- if (newDemand == this.demand) {
- return;
- }
-
- this.demand = newDemand;
this.pushDemand(this.machineEdge, newDemand);
}
/**
* Handle new supply coming from the cpuMux
*
- * @param supplierEdge
- * @param newSupply
+ * @param supplierEdge The edge to the cpuMux
+ * @param newSupply The new supply that is sent to the workload
*/
@Override
public void handleSupply(FlowEdge supplierEdge, double newSupply) {
- if (newSupply == this.supply) {
- return;
- }
-
this.pushSupply(this.machineEdge, newSupply);
}
@@ -245,7 +241,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier {
* If there is a next workload available, start this workload
* Otherwise, close this SimChainWorkload
*
- * @param consumerEdge
+ * @param consumerEdge The edge to the active workload
*/
@Override
public void removeConsumerEdge(FlowEdge consumerEdge) {
@@ -270,18 +266,10 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier {
* Handle the removal of the connection to the cpuMux
* When this happens, close the SimChainWorkload
*
- * @param supplierEdge
+ * @param supplierEdge The edge to the cpuMux
*/
@Override
public void removeSupplierEdge(FlowEdge supplierEdge) {
this.stopWorkload();
}
-
- @SuppressWarnings("unchecked")
- private static <T extends Throwable> void tryThrow(Throwable e) throws T {
- if (e == null) {
- return;
- }
- throw (T) e;
- }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java
index b269564d..59994fe6 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java
@@ -40,9 +40,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
private double currentDemand;
private double currentSupply;
- private long checkpointInterval;
private long checkpointDuration;
- private double checkpointIntervalScaling;
private TraceWorkload snapshot;
@@ -88,9 +86,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
super(((FlowNode) supplier).getGraph());
this.snapshot = workload;
- this.checkpointInterval = workload.getCheckpointInterval();
this.checkpointDuration = workload.getCheckpointDuration();
- this.checkpointIntervalScaling = workload.getCheckpointIntervalScaling();
this.remainingFragments = new LinkedList<>(workload.getFragments());
this.fragmentIndex = 0;
@@ -98,7 +94,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
graph.addEdge(this, supplier);
this.currentFragment = this.getNextFragment();
- pushDemand(machineEdge, (double) this.currentFragment.cpuUsage());
+ pushDemand(machineEdge, this.currentFragment.cpuUsage());
this.startOfFragment = now;
}
@@ -135,7 +131,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.startOfFragment = now - passedTime;
// Change the cpu Usage to the new Fragment
- pushDemand(machineEdge, (double) this.currentFragment.cpuUsage());
+ pushDemand(machineEdge, this.currentFragment.cpuUsage());
// Return the time when the current fragment will complete
return this.startOfFragment + duration;
@@ -163,7 +159,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
/**
* Create a new snapshot based on the current status of the workload.
- * @param now
+ * @param now Moment on which the snapshot is made in milliseconds
*/
public void makeSnapshot(long now) {
@@ -190,7 +186,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.fragmentIndex = -1;
this.currentFragment = getNextFragment();
- pushDemand(this.machineEdge, (double) this.currentFragment.cpuUsage());
+ pushDemand(this.machineEdge, this.currentFragment.cpuUsage());
this.startOfFragment = now;
this.invalidate();
@@ -203,8 +199,8 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
/**
* Handle updates in supply from the Virtual Machine
*
- * @param supplierEdge
- * @param newSupply
+ * @param supplierEdge edge to the VM on which this is running
+ * @param newSupply The new demand that needs to be sent to the VM
*/
@Override
public void handleSupply(FlowEdge supplierEdge, double newSupply) {
@@ -218,8 +214,8 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
/**
* Push a new demand to the Virtual Machine
*
- * @param supplierEdge
- * @param newDemand
+ * @param supplierEdge edge to the VM on which this is running
+ * @param newDemand The new demand that needs to be sent to the VM
*/
@Override
public void pushDemand(FlowEdge supplierEdge, double newDemand) {
@@ -234,7 +230,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
/**
* Add the connection to the Virtual Machine
*
- * @param supplierEdge
+ * @param supplierEdge edge to the VM on which this is running
*/
@Override
public void addSupplierEdge(FlowEdge supplierEdge) {
@@ -245,7 +241,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
* Handle the removal of the connection to the Virtual Machine
* When the connection to the Virtual Machine is removed, the SimTraceWorkload is removed
*
- * @param supplierEdge
+ * @param supplierEdge edge to the VM on which this is running
*/
@Override
public void removeSupplierEdge(FlowEdge supplierEdge) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java
index 25dc564f..dd4c2d11 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java
@@ -31,11 +31,11 @@ import org.opendc.simulator.engine.FlowNode;
import org.opendc.simulator.engine.FlowSupplier;
public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer {
- private ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
+ private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
private FlowEdge supplierEdge;
- private ArrayList<Double> demands = new ArrayList<>(); // What is demanded by the consumers
- private ArrayList<Double> supplies = new ArrayList<>(); // What is supplied to the consumers
+ private final ArrayList<Double> demands = new ArrayList<>(); // What is demanded by the consumers
+ private final ArrayList<Double> supplies = new ArrayList<>(); // What is supplied to the consumers
private double totalDemand; // The total demand of all the consumers
private double totalSupply; // The total supply from the supplier
@@ -180,13 +180,19 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
demands.set(idx, newDemand);
this.totalDemand += (newDemand - prevDemand);
- this.invalidate();
+
+ if (this.totalDemand <= this.capacity) {
+
+ this.totalSupply = this.totalDemand;
+ this.pushDemand(this.supplierEdge, this.totalSupply);
+
+ this.pushSupply(consumerEdge, newDemand);
+ }
+ // TODO: add behaviour if capacity is reached
}
@Override
- public void handleSupply(FlowEdge supplierEdge, double newSupply) {
- this.invalidate();
- }
+ public void handleSupply(FlowEdge supplierEdge, double newSupply) {}
@Override
public void pushDemand(FlowEdge supplierEdge, double newDemand) {
@@ -201,6 +207,10 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
System.out.println("Error (Multiplexer): pushing supply to an unknown consumer");
}
+ if (supplies.get(idx) == newSupply) {
+ return;
+ }
+
supplies.set(idx, newSupply);
consumerEdge.pushSupply(newSupply);
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java
index d89740a2..d99cd78e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java
@@ -90,19 +90,23 @@ public class FlowEdge {
* Push new demand from the Consumer to the Supplier
*/
public void pushDemand(double newDemand) {
+ if (newDemand == this.demand) {
+ return;
+ }
this.demand = newDemand;
this.supplier.handleDemand(this, newDemand);
- ((FlowNode) this.supplier).invalidate();
}
/**
* Push new supply from the Supplier to the Consumer
*/
public void pushSupply(double newSupply) {
+ if (newSupply == this.supply) {
+ return;
+ }
this.supply = newSupply;
this.consumer.handleSupply(this, newSupply);
- ((FlowNode) this.consumer).invalidate();
}
}