diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2024-11-26 11:09:21 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-11-26 11:09:21 +0100 |
| commit | ec73210b675fd90568c5193e6ae6ef82ce81be6c (patch) | |
| tree | 89b530b53898752f7800e5109548412b84fcd375 | |
| parent | 698a64615d0eef8994fc1eaf0a3b71da194e1dcd (diff) | |
Streamlined the FlowNetwork for better performance (#273)
15 files changed, 101 insertions, 128 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt index 32fcf277..0b9916ed 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt @@ -32,7 +32,6 @@ import org.opendc.compute.simulator.telemetry.GuestSystemStats import org.opendc.compute.simulator.telemetry.HostCpuStats import org.opendc.compute.simulator.telemetry.HostSystemStats import org.opendc.simulator.Multiplexer -import org.opendc.simulator.compute.cpu.CpuPowerModel import org.opendc.simulator.compute.machine.SimMachine import org.opendc.simulator.compute.models.MachineModel import org.opendc.simulator.compute.models.MemoryUnit @@ -61,7 +60,6 @@ public class SimHost( private val clock: InstantSource, private val graph: FlowGraph, private val machineModel: MachineModel, - private val powerModel: CpuPowerModel, private val powerMux: Multiplexer, ) : AutoCloseable { /** @@ -131,7 +129,6 @@ public class SimHost( SimMachine( this.graph, this.machineModel, - this.powerModel, this.powerMux, ) { cause -> hostState = if (cause != null) HostState.ERROR else HostState.DOWN diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt index 8e7293c8..30b50c4b 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt @@ -77,7 +77,6 @@ public class HostsProvisioningStep internal constructor( ctx.dispatcher.timeSource, graph, hostSpec.model, - hostSpec.cpuPowerModel, powerMux, ) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt index e067bf45..f4df7991 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt @@ -25,11 +25,9 @@ package org.opendc.experiments.base.runner import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.parameters.options.default import com.github.ajalt.clikt.parameters.options.defaultLazy import com.github.ajalt.clikt.parameters.options.option import com.github.ajalt.clikt.parameters.types.file -import com.github.ajalt.clikt.parameters.types.int import org.opendc.experiments.base.experiment.getExperiment import java.io.File @@ -49,15 +47,8 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") { .file(canBeDir = false, canBeFile = true) .defaultLazy { File("resources/experiment.json") } - /** - * The number of threads to use for parallelism. - */ - private val parallelism by option("-p", "--parallelism", help = "number of worker threads") - .int() - .default(Runtime.getRuntime().availableProcessors() - 1) - override fun run() { val experiment = getExperiment(scenarioPath) - runExperiment(experiment, parallelism) + runExperiment(experiment) } } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt index 0b45806b..079db6fc 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt @@ -22,35 +22,39 @@ package org.opendc.experiments.base.runner +import me.tongfei.progressbar.ProgressBarBuilder +import me.tongfei.progressbar.ProgressBarStyle import org.opendc.experiments.base.experiment.Scenario -import java.util.concurrent.ForkJoinPool /** * Run scenario when no pool is available for parallel execution * * @param experiment The scenarios to run - * @param parallelism The number of scenarios that can be run in parallel */ -public fun runExperiment( - experiment: List<Scenario>, - parallelism: Int, -) { +public fun runExperiment(experiment: List<Scenario>) { val ansiReset = "\u001B[0m" val ansiGreen = "\u001B[32m" val ansiBlue = "\u001B[34m" setupOutputFolderStructure(experiment[0].outputFolder) + val pb = + ProgressBarBuilder().setInitialMax(experiment.sumOf { scenario -> scenario.runs.toLong() }) + .setStyle(ProgressBarStyle.ASCII) + .setTaskName("Simulating...").build() + for (scenario in experiment) { - val pool = ForkJoinPool(parallelism) println( "\n\n$ansiGreen================================================================================$ansiReset", ) println("$ansiBlue Running scenario: ${scenario.name} $ansiReset") println("$ansiGreen================================================================================$ansiReset") - runScenario( - scenario, - pool, - ) + + for (seed in 0..<scenario.runs) { + println("$ansiBlue Starting seed: $seed $ansiReset") + runScenario(scenario, seed.toLong()) + pb.step() + } } + pb.close() } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index 4d6069e4..30c129c2 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -47,16 +47,13 @@ import java.util.stream.LongStream * The scenario is run multiple times based on the user input * * @param scenario The scenario to run - * @param pool The pool on which to run the scenarios */ -public fun runScenario( - scenario: Scenario, - pool: ForkJoinPool, -) { +public fun runScenario(scenario: Scenario) { val pb = ProgressBarBuilder().setInitialMax(scenario.runs.toLong()).setStyle(ProgressBarStyle.ASCII) .setTaskName("Simulating...").build() + val pool = ForkJoinPool(5) pool.submit { LongStream.range(0, scenario.runs.toLong()).parallel().forEach { runScenario(scenario, scenario.initialSeed + it) diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt index abf16fef..17d62b1a 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt @@ -294,8 +294,8 @@ class ScenarioIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(1803918472, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(787181528, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(1803918473, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(787181527, monitor.activeTime) { "Active time incorrect" } }, { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, { assertEquals(6.7565629E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, @@ -341,8 +341,8 @@ class ScenarioIntegrationTest { { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") }, { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") }, - { assertEquals(43101793092, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(3489406908, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(43101787433, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(3489412567, monitor.activeTime) { "Incorrect active time" } }, { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } }, { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, { assertEquals(1.0016123392181786E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SARunner.kt b/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SARunner.kt index 0068738a..49bbdb96 100644 --- a/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SARunner.kt +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SARunner.kt @@ -25,9 +25,7 @@ package org.opendc.experiments.m3sa.runner import org.opendc.experiments.base.experiment.Scenario -import org.opendc.experiments.base.runner.runScenario import org.opendc.experiments.base.runner.setupOutputFolderStructure -import java.util.concurrent.ForkJoinPool /** * Run scenario when no pool is available for parallel execution @@ -39,22 +37,7 @@ public fun runExperiment( experiment: List<Scenario>, parallelism: Int, ) { - val ansiReset = "\u001B[0m" - val ansiGreen = "\u001B[32m" - val ansiBlue = "\u001B[34m" - setupOutputFolderStructure(experiment[0].outputFolder) - for (scenario in experiment) { - val pool = ForkJoinPool(parallelism) - println( - "\n\n$ansiGreen================================================================================$ansiReset", - ) - println("$ansiBlue Running scenario: ${scenario.name} $ansiReset") - println("$ansiGreen================================================================================$ansiReset") - runScenario( - scenario, - pool, - ) - } + runExperiment(experiment, parallelism) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java index 24627a9c..18214172 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java @@ -46,7 +46,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer private double maxCapacity; - private PerformanceCounters performanceCounters = new PerformanceCounters(); + private final PerformanceCounters performanceCounters = new PerformanceCounters(); private long lastCounterUpdate; private final double cpuFrequencyInv; @@ -122,22 +122,20 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer public long onUpdate(long now) { updateCounters(now); + this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity; + // Calculate Power Demand and send to PSU - // TODO: look at the double / double thing double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization); if (powerDemand != this.currentPowerDemand) { this.pushDemand(this.psuEdge, powerDemand); - this.currentPowerDemand = powerDemand; } // Calculate the amount of cpu this can provide - // TODO: This should be based on the provided power double cpuSupply = this.currentCpuDemand; if (cpuSupply != this.currentCpuSupplied) { this.pushSupply(this.muxEdge, cpuSupply); - this.currentCpuSupplied = cpuSupply; } return Long.MAX_VALUE; @@ -183,6 +181,8 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer */ @Override public void pushDemand(FlowEdge supplierEdge, double newPowerDemand) { + updateCounters(); + this.currentPowerDemand = newPowerDemand; this.psuEdge.pushDemand(newPowerDemand); } @@ -205,7 +205,12 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer this.currentCpuDemand = newCpuDemand; this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity; - this.invalidate(); + // Calculate Power Demand and send to PSU + double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization); + + if (powerDemand != this.currentPowerDemand) { + this.pushDemand(this.psuEdge, powerDemand); + } } /** @@ -217,7 +222,12 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer updateCounters(); this.currentPowerSupplied = newPowerSupply; - this.invalidate(); + // Calculate the amount of cpu this can provide + double cpuSupply = this.currentCpuDemand; + + if (cpuSupply != this.currentCpuSupplied) { + this.pushSupply(this.muxEdge, cpuSupply); + } } /** @@ -234,6 +244,8 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer @Override public void addSupplierEdge(FlowEdge supplierEdge) { this.psuEdge = supplierEdge; + + this.invalidate(); } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java index 4602223c..c7caa63f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java @@ -25,7 +25,6 @@ package org.opendc.simulator.compute.machine; import java.time.InstantSource; import java.util.function.Consumer; import org.opendc.simulator.Multiplexer; -import org.opendc.simulator.compute.cpu.CpuPowerModel; import org.opendc.simulator.compute.cpu.SimCpu; import org.opendc.simulator.compute.memory.Memory; import org.opendc.simulator.compute.models.MachineModel; @@ -48,7 +47,7 @@ public class SimMachine { private SimPsu psu; private Memory memory; - private Consumer<Exception> completion; + private final Consumer<Exception> completion; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Basic Getters and Setters @@ -112,11 +111,7 @@ public class SimMachine { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// public SimMachine( - FlowGraph graph, - MachineModel machineModel, - CpuPowerModel cpuPowerModel, - Multiplexer powerMux, - Consumer<Exception> completion) { + FlowGraph graph, MachineModel machineModel, Multiplexer powerMux, Consumer<Exception> completion) { this.graph = graph; this.machineModel = machineModel; this.clock = graph.getEngine().getClock(); diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java index 0557165b..15a1b1c4 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java @@ -36,7 +36,7 @@ import org.opendc.simulator.engine.FlowSupplier; A virtual Machine created to run a single workload */ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSupplier { - private SimMachine machine; + private final SimMachine machine; private SimWorkload activeWorkload; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java index a219d4e6..ea500c81 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java @@ -126,12 +126,6 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier { @Override public long onUpdate(long now) { - updateCounters(); - double powerSupply = this.powerDemand; - - if (powerSupply != this.powerSupplied) { - this.pushSupply(this.muxEdge, powerSupply); - } return Long.MAX_VALUE; } @@ -163,14 +157,17 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier { @Override public void handleDemand(FlowEdge consumerEdge, double newPowerDemand) { - this.powerDemand = newPowerDemand; - this.invalidate(); + + double powerSupply = this.powerDemand; + + if (powerSupply != this.powerSupplied) { + this.pushSupply(this.muxEdge, powerSupply); + } } @Override public void pushSupply(FlowEdge consumerEdge, double newSupply) { - this.powerSupplied = newSupply; consumerEdge.pushSupply(newSupply); } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java index f91c363d..5b7c10bb 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -170,7 +170,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { /** * Add connection to the active workload * - * @param consumerEdge + * @param consumerEdge The edge to the workload */ @Override public void addConsumerEdge(FlowEdge consumerEdge) { @@ -179,7 +179,8 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { /** * Add Connection to the cpuMux - * @param supplierEdge + * + * @param supplierEdge The edge to the cpuMux */ @Override public void addSupplierEdge(FlowEdge supplierEdge) { @@ -190,53 +191,48 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { /** * Push demand to the cpuMux * - * @param supplierEdge - * @param newDemand + * @param supplierEdge The edge to the cpuMux + * @param newDemand new demand to sent to the cpu */ @Override public void pushDemand(FlowEdge supplierEdge, double newDemand) { + + this.demand = newDemand; this.machineEdge.pushDemand(newDemand); } /** * Push supply to the workload * - * @param consumerEdge - * @param newSupply + * @param consumerEdge The edge to the cpuMux + * @param newSupply new supply to sent to the workload */ @Override public void pushSupply(FlowEdge consumerEdge, double newSupply) { + + this.supply = newSupply; this.workloadEdge.pushSupply(newSupply); } /** * Handle new demand coming from the workload * - * @param consumerEdge - * @param newDemand + * @param consumerEdge The edge to the workload + * @param newDemand new demand coming from the workload */ @Override public void handleDemand(FlowEdge consumerEdge, double newDemand) { - if (newDemand == this.demand) { - return; - } - - this.demand = newDemand; this.pushDemand(this.machineEdge, newDemand); } /** * Handle new supply coming from the cpuMux * - * @param supplierEdge - * @param newSupply + * @param supplierEdge The edge to the cpuMux + * @param newSupply The new supply that is sent to the workload */ @Override public void handleSupply(FlowEdge supplierEdge, double newSupply) { - if (newSupply == this.supply) { - return; - } - this.pushSupply(this.machineEdge, newSupply); } @@ -245,7 +241,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { * If there is a next workload available, start this workload * Otherwise, close this SimChainWorkload * - * @param consumerEdge + * @param consumerEdge The edge to the active workload */ @Override public void removeConsumerEdge(FlowEdge consumerEdge) { @@ -270,18 +266,10 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { * Handle the removal of the connection to the cpuMux * When this happens, close the SimChainWorkload * - * @param supplierEdge + * @param supplierEdge The edge to the cpuMux */ @Override public void removeSupplierEdge(FlowEdge supplierEdge) { this.stopWorkload(); } - - @SuppressWarnings("unchecked") - private static <T extends Throwable> void tryThrow(Throwable e) throws T { - if (e == null) { - return; - } - throw (T) e; - } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java index b269564d..59994fe6 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java @@ -40,9 +40,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { private double currentDemand; private double currentSupply; - private long checkpointInterval; private long checkpointDuration; - private double checkpointIntervalScaling; private TraceWorkload snapshot; @@ -88,9 +86,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { super(((FlowNode) supplier).getGraph()); this.snapshot = workload; - this.checkpointInterval = workload.getCheckpointInterval(); this.checkpointDuration = workload.getCheckpointDuration(); - this.checkpointIntervalScaling = workload.getCheckpointIntervalScaling(); this.remainingFragments = new LinkedList<>(workload.getFragments()); this.fragmentIndex = 0; @@ -98,7 +94,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { graph.addEdge(this, supplier); this.currentFragment = this.getNextFragment(); - pushDemand(machineEdge, (double) this.currentFragment.cpuUsage()); + pushDemand(machineEdge, this.currentFragment.cpuUsage()); this.startOfFragment = now; } @@ -135,7 +131,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.startOfFragment = now - passedTime; // Change the cpu Usage to the new Fragment - pushDemand(machineEdge, (double) this.currentFragment.cpuUsage()); + pushDemand(machineEdge, this.currentFragment.cpuUsage()); // Return the time when the current fragment will complete return this.startOfFragment + duration; @@ -163,7 +159,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { /** * Create a new snapshot based on the current status of the workload. - * @param now + * @param now Moment on which the snapshot is made in milliseconds */ public void makeSnapshot(long now) { @@ -190,7 +186,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.fragmentIndex = -1; this.currentFragment = getNextFragment(); - pushDemand(this.machineEdge, (double) this.currentFragment.cpuUsage()); + pushDemand(this.machineEdge, this.currentFragment.cpuUsage()); this.startOfFragment = now; this.invalidate(); @@ -203,8 +199,8 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { /** * Handle updates in supply from the Virtual Machine * - * @param supplierEdge - * @param newSupply + * @param supplierEdge edge to the VM on which this is running + * @param newSupply The new demand that needs to be sent to the VM */ @Override public void handleSupply(FlowEdge supplierEdge, double newSupply) { @@ -218,8 +214,8 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { /** * Push a new demand to the Virtual Machine * - * @param supplierEdge - * @param newDemand + * @param supplierEdge edge to the VM on which this is running + * @param newDemand The new demand that needs to be sent to the VM */ @Override public void pushDemand(FlowEdge supplierEdge, double newDemand) { @@ -234,7 +230,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { /** * Add the connection to the Virtual Machine * - * @param supplierEdge + * @param supplierEdge edge to the VM on which this is running */ @Override public void addSupplierEdge(FlowEdge supplierEdge) { @@ -245,7 +241,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { * Handle the removal of the connection to the Virtual Machine * When the connection to the Virtual Machine is removed, the SimTraceWorkload is removed * - * @param supplierEdge + * @param supplierEdge edge to the VM on which this is running */ @Override public void removeSupplierEdge(FlowEdge supplierEdge) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java index 25dc564f..dd4c2d11 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java @@ -31,11 +31,11 @@ import org.opendc.simulator.engine.FlowNode; import org.opendc.simulator.engine.FlowSupplier; public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer { - private ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); + private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); private FlowEdge supplierEdge; - private ArrayList<Double> demands = new ArrayList<>(); // What is demanded by the consumers - private ArrayList<Double> supplies = new ArrayList<>(); // What is supplied to the consumers + private final ArrayList<Double> demands = new ArrayList<>(); // What is demanded by the consumers + private final ArrayList<Double> supplies = new ArrayList<>(); // What is supplied to the consumers private double totalDemand; // The total demand of all the consumers private double totalSupply; // The total supply from the supplier @@ -180,13 +180,19 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer demands.set(idx, newDemand); this.totalDemand += (newDemand - prevDemand); - this.invalidate(); + + if (this.totalDemand <= this.capacity) { + + this.totalSupply = this.totalDemand; + this.pushDemand(this.supplierEdge, this.totalSupply); + + this.pushSupply(consumerEdge, newDemand); + } + // TODO: add behaviour if capacity is reached } @Override - public void handleSupply(FlowEdge supplierEdge, double newSupply) { - this.invalidate(); - } + public void handleSupply(FlowEdge supplierEdge, double newSupply) {} @Override public void pushDemand(FlowEdge supplierEdge, double newDemand) { @@ -201,6 +207,10 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer System.out.println("Error (Multiplexer): pushing supply to an unknown consumer"); } + if (supplies.get(idx) == newSupply) { + return; + } + supplies.set(idx, newSupply); consumerEdge.pushSupply(newSupply); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java index d89740a2..d99cd78e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java @@ -90,19 +90,23 @@ public class FlowEdge { * Push new demand from the Consumer to the Supplier */ public void pushDemand(double newDemand) { + if (newDemand == this.demand) { + return; + } this.demand = newDemand; this.supplier.handleDemand(this, newDemand); - ((FlowNode) this.supplier).invalidate(); } /** * Push new supply from the Supplier to the Consumer */ public void pushSupply(double newSupply) { + if (newSupply == this.supply) { + return; + } this.supply = newSupply; this.consumer.handleSupply(this, newSupply); - ((FlowNode) this.consumer).invalidate(); } } |
